Слияние кода завершено, страница обновится автоматически
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2015 Qing Liang (https://github.com/liangqing)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from __future__ import absolute_import
from __future__ import unicode_literals
from __future__ import division
from io import open
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import os
import zlib
import hashlib
from struct import pack, unpack
from time import time
from io import BytesIO
from .filetree import FileEntry
class InvalidKey(Exception):
pass
class DecryptError(Exception):
pass
class VersionNotCompatible(Exception):
pass
class Crypto(object):
VERSION = 0x1
COMPRESS = 0x1
BUFFER_SIZE = 1024 * 16
def __init__(self, password, key_size=32):
self.password = password.encode("utf-8")
self.key_size = key_size
self.block_size = 16
def encrypt_file(self, plain_path, encrypted_path, plain_file_entry):
plain_fd = open(plain_path, 'rb')
encrypted_fd = open(encrypted_path, 'wb')
self.encrypt_fd(plain_fd, encrypted_fd, plain_file_entry)
plain_fd.close()
encrypted_fd.close()
def decrypt_file(self, encrypted_path, plain_path):
plain_fd = open(plain_path, 'wb')
encrypted_fd = open(encrypted_path, 'rb')
file_entry = self.decrypt_fd(encrypted_fd, plain_fd)
plain_fd.close()
encrypted_fd.close()
return file_entry
@staticmethod
def compress_fd(in_fd, out_fd):
compress_obj = zlib.compressobj()
while True:
data = in_fd.read(Crypto.BUFFER_SIZE)
if len(data) > 0:
out_fd.write(compress_obj.compress(data))
else:
break
out_fd.write(compress_obj.flush())
@staticmethod
def decompress_fd(in_fd, out_fd):
decompress_obj = zlib.decompressobj()
while True:
data = in_fd.read(Crypto.BUFFER_SIZE)
if len(data) > 0:
out_fd.write(decompress_obj.decompress(data))
else:
break
out_fd.write(decompress_obj.flush())
def gen_key_and_iv(self, salt):
d = d_i = b''
while len(d) < self.key_size + self.block_size:
d_i = hashlib.md5(d_i + self.password + salt).digest()
d += d_i
return d[:self.key_size], d[self.key_size:self.key_size+self.block_size]
@staticmethod
def _build_footer(file_entry):
return file_entry.digest + \
pack(b'!Q', file_entry.size) + \
pack(b'!I', int(file_entry.mtime)) + \
pack(b'!i', file_entry.mode)
@staticmethod
def _unpack_footer(pathname, footer):
(size, mtime, mode) = unpack(b'!QIi', footer[16:32])
return FileEntry(pathname, size, None, mtime, mode,
footer[:16], False)
def encrypt_fd(self, in_fd, out_fd, file_entry, flags=0):
"""
+-----------------------------------------------------+
| Version(1) | Flags(1) | Pathname size(2) | Salt(12) |
+-----------------------------------------------------+
| Encrypted Pathname |
+-----------------------------------------------------+
| Encrypted Content |
| ... |
+-----------------------------------------------------+
| Encrypted Content Digest(16) |
+-----------------------------------------------------+
| size(8)* | mtime(4) | mode(4) |
+-----------------------------------------------------+
| Encrypted Entire Digest(16) |
+-----------------------------------------------------+
* size, mtime, mode are also encrypted
"""
bs = self.block_size
if file_entry is None:
file_entry = FileEntry('file_entry.tmp', 0, time(), time(), 0)
if file_entry.salt is None:
file_entry.salt = os.urandom(bs - 4)
key, iv = self.gen_key_and_iv(file_entry.salt)
cipher = Cipher(algorithms.AES(key), modes.CBC(iv),
backend=default_backend())
encryptor = cipher.encryptor()
pathname = file_entry.pathname.encode("utf-8")[:2**16]
pathname_size = len(pathname)
pathname_padding = b''
if pathname_size % bs != 0:
padding_length = (bs - pathname_size % bs)
pathname_padding = padding_length * b'\0'
flags &= 0xFF
out_fd.write(pack(b'BB', self.VERSION, flags))
out_fd.write(pack(b'!H', pathname_size))
out_fd.write(file_entry.salt)
out_fd.write(encryptor.update(pathname+pathname_padding))
compress_obj = None
if flags & Crypto.COMPRESS:
compress_obj = zlib.compressobj()
finished = False
md5 = hashlib.md5()
rest = b''
while not finished:
if compress_obj is not None:
buf = BytesIO(rest)
should_read = self.BUFFER_SIZE - len(rest)
compress_size = 0
while compress_size < should_read:
in_data = in_fd.read(self.BUFFER_SIZE)
if len(in_data) == 0:
try:
buf.write(compress_obj.flush())
except Exception:
pass
break
md5.update(in_data)
compress_data = compress_obj.compress(in_data)
compress_size += len(compress_data)
buf.write(compress_data)
data = buf.getvalue()
chunk = data[:self.BUFFER_SIZE]
rest = data[self.BUFFER_SIZE:]
else:
chunk = in_fd.read(self.BUFFER_SIZE)
md5.update(chunk)
if len(chunk) == 0 or len(chunk) % bs != 0:
padding_length = (bs - len(chunk) % bs) or bs
chunk += padding_length * pack(b'B', padding_length)
finished = True
out_fd.write(encryptor.update(chunk))
file_entry.digest = md5.digest()
footer = self._build_footer(file_entry)
md5.update(footer)
entire_digest = md5.digest()
out_fd.write(encryptor.update(footer))
out_fd.write(encryptor.update(entire_digest))
out_fd.write(encryptor.finalize())
return file_entry
def decrypt_fd(self, in_fd, out_fd):
bs = self.block_size
line = in_fd.read(bs)
if len(line) < bs:
raise DecryptError(
"header line size is not correct, expect %d, got %d" %
(bs, len(line)))
ints = bytearray(line[:2])
version = ints[0]
if version > self.VERSION:
raise VersionNotCompatible("Unrecognized version: (%d)" % version)
flags = ints[1]
(pathname_size,) = unpack(b'!H', line[2:4])
salt = line[4:]
key, iv = self.gen_key_and_iv(salt)
cipher = Cipher(algorithms.AES(key), modes.CBC(iv),
backend=default_backend())
decryptor = cipher.decryptor()
pathname_block_size = pathname_size
if pathname_size % bs != 0:
pathname_block_size = int((pathname_size/bs+1) * bs)
pathname_data = in_fd.read(pathname_block_size)
if len(pathname_data) < pathname_block_size:
raise DecryptError(
"pathname length is not correct, expect %d, got %d" %
(pathname_block_size, len(pathname_data)))
pathname_data = decryptor.update(pathname_data)[:pathname_size]
try:
pathname = pathname_data.decode("utf-8")
except UnicodeDecodeError:
raise DecryptError()
md5 = hashlib.md5()
next_chunk = ''
finished = False
file_entry = None
decompress_obj = None
if flags & self.COMPRESS:
decompress_obj = zlib.decompressobj()
footer_size = 48
entire_digest = None
entire_digest_check = None
content_digest_check = None
footer = None
while not finished:
chunk, next_chunk = next_chunk, in_fd.read(self.BUFFER_SIZE)
if chunk:
plaintext = decryptor.update(chunk)
if len(next_chunk) == 0:
plaintext += decryptor.finalize()
entire_digest = plaintext[-16:]
footer = plaintext[-footer_size:-16]
file_entry = self._unpack_footer(pathname, footer)
padding_length = \
bytearray(plaintext[-footer_size-1:-footer_size])[0]
plaintext = plaintext[:-padding_length-footer_size]
finished = True
if decompress_obj is not None:
decompress_error = False
try:
plaintext = decompress_obj.decompress(plaintext)
except zlib.error:
decompress_error = True
if decompress_error:
raise DecryptError()
md5.update(plaintext)
out_fd.write(plaintext)
if finished:
content_digest_check = md5.digest()
md5.update(footer)
entire_digest_check = md5.digest()
if decompress_obj is not None:
rest = decompress_obj.flush()
md5.update(rest)
out_fd.write(rest)
if file_entry is None or entire_digest is None \
or entire_digest_check is None or content_digest_check is None:
raise DecryptError()
file_entry.salt = salt
if file_entry.digest != content_digest_check or entire_digest != \
entire_digest_check:
raise DecryptError()
return file_entry
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарий ( 0 )