@botnet_hunter's blog
Detecting shellcode over the network with scapy and libemu

In this blog post, I will describe a method of using libemu along with scapy in order to detect shellcode being sent across the network in unencrypted channels. This method would not be particularly reasonable for large networks without a significant amount of computing power. On the other hand, for semi-automated analysis, or even integration into something like Cuckoo Sandbox, it may be quite useful.

While this proof of concept is currently functional, it would take some work on the project’s dependencies in order for me to consider the project a success. I have not yet decided if I plan on dedicating significant time to resolving issues in the related projects.

Network Detection of Shellcode

In order to detect this shellcode, we need a few major components.

  • Method to detect shellcode in a buffer
  • Method to acquire network packets
  • Method to assembly socket communication over the network

For the first requirement, we simply use libemu.

For the remaining requirements, we use scapy and some additional Python code.

libemu

Libemu is a library for detecting and emulating shellcode, allowing for some automated analysis of shellcode. Unfortunately, it does not appear to be under active development, and getting up to date source code is currently alluding me. It also appears to have a few bugs, some of which make it less viable for integration into network perimeter defenses.

Along with it being difficult to locate the source, the current libemu interfaces to libemu in Python both seem to have some issues. While both python-libemu and pylibemu are very useful, python-libemu fails to implement many of the features of libemu, and pylibemu currently has some issues which seriously impact memory usage.

I have found that even via the Python interfaces, an arbitrary buffer supplied to libemu for processing can lead to a floating point error, which will crash the process. This is one reason I do not suggest running this in production environments, or on large enterprise networks.

For our purposes, we’ll use python-libemu, installed on Ubuntu with the following:

sudo apt-get install python-libemu

scapy

Scapy is well known as a raw networking library for Python, and being pretty awesome. I’m not going to spend time covering Scapy.

sudo pip install scapy

Analysis

In order to scan over these packets in a similar way that the attacker would send them, we need to create the socket streams. More specifically, we need to look at each direction of communication. Additionally, we don’t want all of every stream in memory at the same time, so we use a LRUCache (from cachetools) to keep the most recent streams in memory, and use a memory buffering technique called a sliding window.

In a sliding window, we set the max amount of memory we want to use per stream, then as we get new data, we append the new data, then cut off bytes in the beginning of our buffer until it is smaller than our max buffer size. This way we can look at all the data, and avoid simply looking at each packet.

For each new packet we get, we scan the sliding window buffer with libemu. If libemu detects shellcode, we print the offset and the quad (ip:port-ip:port).

Code

I will not update the code in this blog post, but if I need to, I will update the code in the following gist.

Network Shellcode Detection Proof of Concept

In the proof of concept, the shellcode detection is being done across every thread on the local computer. This is done because libemu scanning scan be quite CPU intensive and slow.

The PoC requires the following commands before being installed.

sudo apt-get install python-libemu python-pip
sudo pip install scapy
sudo pip install cachetools

The code:

#!/usr/bin/env python2
import logging

logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
from multiprocessing.pool import ThreadPool
from multiprocessing import cpu_count
import libemu
from cachetools import LRUCache
from scapy.layers.all import TCP, UDP, Raw
from scapy.all import sniff
import Queue


stream_cache = LRUCache(1024)
BUFFER_SIZE = 1024 * 1024
USE_SLIDING_WINDOW = True
THREADS = cpu_count()
thread_pool = None


class LimitedPool(ThreadPool):
    def __init__(self, processes=None, initializer=None, initargs=(), max_queue_size=10000):
        self._max_queue_size = max_queue_size
        ThreadPool.__init__(self, processes, initializer, initargs)

    def _setup_queues(self):
        self._inqueue = Queue.Queue(self._max_queue_size)
        self._outqueue = Queue.Queue()
        self._quick_put = self._inqueue.put
        self._quick_get = self._outqueue.get


def generate_key(packet):
    k = "{0}:{1}-{2}:{3}".format(packet.payload.src, packet.payload.payload.sport, packet.payload.dst, packet.payload.payload.dport)
    if packet.haslayer(TCP):
        return "tcp://{0}".format(k)
    if packet.haslayer(UDP):
        return "udp://{0}".format(k)


def test_for_shellcode((key, packet, file_path)):
    e = libemu.Emulator()
    r = e.test(packet)
    if r is not None and r >= 0:
        logging.warning("{2}: {0} - {1}".format(key, {"offset": r}, file_path))


def process_packet(pkt, file_path):
    global thread_pool
    try:
        if pkt.haslayer(Raw) and len(pkt[Raw].original) > 0:
            k = generate_key(pkt)
            if USE_SLIDING_WINDOW:
                if k in stream_cache:
                    stream_cache[k] += pkt[Raw].original
                    stream_cache[k] = stream_cache[k][-BUFFER_SIZE:]
                else:
                    stream_cache[k] = pkt[Raw].original[-BUFFER_SIZE:]
                p = stream_cache[k]
            else:
                p = pkt[Raw].original

            #test_for_shellcode((k, p, file_path))
            thread_pool.apply_async(test_for_shellcode, [(k, p, file_path)])
    except KeyboardInterrupt:
        raise
    except:
        pass


if __name__ == "__main__":
    logging.basicConfig(level=logging.DEBUG)
    from argparse import ArgumentParser

    parser = ArgumentParser(
        prog=__file__,
        description="Proof of concept for shellcode identification in pcap or network",
        version="%(prog)s v0.0.1 by Brian Wallace (@botnet_hunter)",
        epilog="%(prog)s v0.0.1 by Brian Wallace (@botnet_hunter)"
    )
    parser.add_argument('path', metavar='path', type=str, nargs='*', default=None, help="Paths to files to parse")

    args = parser.parse_args()

    thread_pool = LimitedPool(processes=THREADS, max_queue_size=200)

    if args.path is None or len(args.path) == 0:
        print "Live sniffing"
        sniff(store=0, filter="tcp or udp", prn=lambda x: process_packet(x, "live"))
    else:
        for p in args.path:
            logging.warning("Reading packets from {0}".format(p))
            sniff(store=0, filter="tcp or udp", offline=p, prn=lambda x: process_packet(x, p))

    thread_pool.close()
    thread_pool.join()

Network shellcode detection PoC working

11 Oct 2015 #network #python #project
Design pdevty