电脑装配网

xlog是什么文件(xlog文件的解码操作)

 人阅读 | 作者奔跑的小羊 | 时间:2022-11-03 11:44

前面介绍过引入xlog,将日志打印成一个xlog文件,这里就来介绍一下怎么将xlog文件解码成正常的可读日志文件。

对于不加密的xlog进行解码

如果在Android代码里初始化xlog的时候,使用的是不加密的方式,比如这样:

Xlog.open(false, Xlog.LEVEL_DEBUG, Xlog.AppednerModeAsync, "", logPath, "dbxLog", "");

最后一个入参表示pubkey,传空就代表不对日志内容进行加密。

那就可以直接用mars工程里的
decode_mars_nocrypt_log_file.py文件直接转换,但是mars提供的这个python工具是python2的版本。现在多数都是用python3的,如果你愿意在本地配多个python版本,那就配置一个python2,添加一些需要的库,直接执行那个就可以。

如果你不想配置python2,就想用python3执行,也可以手动改一下这个python文件,这是我改完后的:

#!/usr/bin/python

import sys
import os
import glob
import zlib
import struct
import binascii
import traceback

MAGIC_NO_COMPRESS_START = 0x03
MAGIC_NO_COMPRESS_START1 = 0x06
MAGIC_NO_COMPRESS_NO_CRYPT_START = 0x08
MAGIC_COMPRESS_START = 0x04
MAGIC_COMPRESS_START1 = 0x05
MAGIC_COMPRESS_START2 = 0x07
MAGIC_COMPRESS_NO_CRYPT_START = 0x09

MAGIC_END = 0x00

lastseq = 0


def IsGoodLogBuffer(_buffer, _offset, count):
    if _offset == len(_buffer): return (True, '')

    magic_start = _buffer[_offset]
    if MAGIC_NO_COMPRESS_START == magic_start or MAGIC_COMPRESS_START == magic_start or MAGIC_COMPRESS_START1 == magic_start:
        crypt_key_len = 4
    elif MAGIC_COMPRESS_START2 == magic_start or MAGIC_NO_COMPRESS_START1 == magic_start or MAGIC_NO_COMPRESS_NO_CRYPT_START == magic_start or MAGIC_COMPRESS_NO_CRYPT_START == magic_start:
        crypt_key_len = 64
    else:
        return (False, '_buffer[%d]:%d != MAGIC_NUM_START' % (_offset, _buffer[_offset]))

    headerLen = 1 + 2 + 1 + 1 + 4 + crypt_key_len

    if _offset + headerLen + 1 + 1 > len(_buffer): return (
        False, 'offset:%d > len(buffer):%d' % (_offset, len(_buffer)))
    start = _offset + headerLen - 4 - crypt_key_len
    length = struct.unpack_from("I", memoryview(_buffer)[start:start + 4].tobytes())[0]
    if _offset + headerLen + length + 1 > len(_buffer):
        return (
            False,
            'log length:%d, end pos %d > len(buffer):%d' % (length, _offset + headerLen + length + 1, len(_buffer)))
    if MAGIC_END != _buffer[_offset + headerLen + length]: return (False,
                                                                   'log length:%d, buffer[%d]:%d != MAGIC_END' % (
                                                                       length, _offset + headerLen + length,
                                                                       _buffer[_offset + headerLen + length]))

    if (1 >= count):
        return (True, '')
    else:
        return IsGoodLogBuffer(_buffer, _offset + headerLen + length + 1, count - 1)


def GetLogStartPos(_buffer, _count):
    offset = 0
    while True:
        if offset >= len(_buffer): break

        if MAGIC_NO_COMPRESS_START == _buffer[offset] or MAGIC_NO_COMPRESS_START1 == _buffer[
            offset] or MAGIC_COMPRESS_START == _buffer[offset] or MAGIC_COMPRESS_START1 == _buffer[
            offset] or MAGIC_COMPRESS_START2 == _buffer[offset] or MAGIC_COMPRESS_NO_CRYPT_START == _buffer[
            offset] or MAGIC_NO_COMPRESS_NO_CRYPT_START == _buffer[offset]:
            if IsGoodLogBuffer(_buffer, offset, _count)[0]: return offset
        offset += 1

    return -1


def DecodeBuffer(_buffer, _offset, _outbuffer):
    if _offset >= len(_buffer): return -1
    # if _offset + 1 + 4 + 1 + 1 > len(_buffer): return -1
    ret = IsGoodLogBuffer(_buffer, _offset, 1)
    if not ret[0]:
        fixpos = GetLogStartPos(_buffer[_offset:], 1)
        if -1 == fixpos:
            return -1
        else:
            _outbuffer.extend("[F]decode_log_file.py decode error len=%d, result:%s \n" % (fixpos, ret[1]))
            _offset += fixpos

    magic_start = _buffer[_offset]
    if MAGIC_NO_COMPRESS_START == magic_start or MAGIC_COMPRESS_START == magic_start or MAGIC_COMPRESS_START1 == magic_start:
        crypt_key_len = 4
    elif MAGIC_COMPRESS_START2 == magic_start or MAGIC_NO_COMPRESS_START1 == magic_start or MAGIC_NO_COMPRESS_NO_CRYPT_START == magic_start or MAGIC_COMPRESS_NO_CRYPT_START == magic_start:
        crypt_key_len = 64
    else:
        _outbuffer.extend('in DecodeBuffer _buffer[%d]:%d != MAGIC_NUM_START' % (_offset, magic_start))
        return -1

    headerLen = 1 + 2 + 1 + 1 + 4 + crypt_key_len
    start = _offset + headerLen - 4 - crypt_key_len
    length = struct.unpack_from("I", memoryview(_buffer)[start:start + 4])[0]
    tmpbuffer = bytearray(length)

    seq = struct.unpack_from("H", memoryview(_buffer)[start - 2 - 2: start - 2])[0]
    begin_hour = struct.unpack_from("c", memoryview(_buffer)[start - 1 - 1:start - 1])[0]
    end_hour = struct.unpack_from("c", memoryview(_buffer)[start - 1:start])[0]

    global lastseq
    if seq != 0 and seq != 1 and lastseq != 0 and seq != (lastseq + 1):
        _outbuffer.extend("[F]decode_log_file.py log seq:%d-%d is missing\n" % (lastseq + 1, seq - 1))

    if seq != 0:
        lastseq = seq

    tmpbuffer[:] = _buffer[_offset + headerLen:_offset + headerLen + length]

    try:
        decompressor = zlib.decompressobj(-zlib.MAX_WBITS)

        if MAGIC_NO_COMPRESS_START1 == _buffer[_offset] or MAGIC_COMPRESS_START2 == _buffer[_offset]:
            print("use wrong decode script")
        elif MAGIC_COMPRESS_START == _buffer[_offset] or MAGIC_COMPRESS_NO_CRYPT_START == _buffer[_offset]:
            tmpbuffer = decompressor.decompress(bytes(tmpbuffer))
        elif MAGIC_COMPRESS_START1 == _buffer[_offset]:
            decompress_data = bytearray()
            while len(tmpbuffer) > 0:
                single_log_len = struct.unpack_from("H", memoryview(tmpbuffer)[0:2])[0]
                decompress_data.extend(tmpbuffer[2:single_log_len + 2])
                tmpbuffer[:] = tmpbuffer[single_log_len + 2:len(tmpbuffer)]

            tmpbuffer = decompressor.decompress(str(decompress_data))

        else:
            pass

            # _outbuffer.extend('seq:%d, hour:%d-%d len:%d decompress:%d\n' %(seq, ord(begin_hour), ord(end_hour), length, len(tmpbuffer)))
    except Exception as e:
        traceback.print_exc()
        _outbuffer.extend("[F]decode_log_file.py decompress err, " + str(e) + "\n")
        return _offset + headerLen + length + 1

    _outbuffer.extend(tmpbuffer)

    return _offset + headerLen + length + 1


def ParseFile(_file, _outfile):
    fp = open(_file, "rb")
    _buffer = bytearray(os.path.getsize(_file))
    fp.readinto(_buffer)
    fp.close()
    startpos = GetLogStartPos(_buffer, 2)
    if -1 == startpos:
        return

    outbuffer = bytearray()

    while True:
        startpos = DecodeBuffer(_buffer, startpos, outbuffer)
        if -1 == startpos: break;

    if 0 == len(outbuffer): return

    fpout = open(_outfile, "wb")
    fpout.write(outbuffer)
    fpout.close()


def main(args):
    global lastseq

    if 1 == len(args):
        if os.path.isdir(args[0]):
            filelist = glob.glob(args[0] + "/*.xlog")
            for filepath in filelist:
                lastseq = 0
                ParseFile(filepath, filepath + ".log")
        else:
            ParseFile(args[0], args[0] + ".log")
    elif 2 == len(args):
        ParseFile(args[0], args[1])
    else:
        filelist = glob.glob("*.xlog")
        for filepath in filelist:
            lastseq = 0
            ParseFile(filepath, filepath + ".log")


if __name__ == "__main__":
    main(sys.argv[1:])

改的思路其实也很简单,用python3执行,哪里报错改哪里,主要是buffer()方法在python3中没有了,换了一个方法。

这个是我修改后文件(
decode_mars_nocrypt_log_file.py)的地址:

https://www.aliyundrive.com/s/CTiyzYJWjVK

解码的具体操作就是直接执行,将xlog文件作为参数传入:

python decode_mars_nocrypt_log_file.py dbxLog_20220514.xlog

对于加密的xlog进行解码

mars已经提供了加密的工具:gen_key.py,直接执行就可以获取一组随机的公钥、私钥:

通过gen_key.py获取的密钥

在Android代码中初始化xlog的时候,将公钥传递给指定参数,输出的日志就会进行加密:

Xlog.open(false, Xlog.LEVEL_DEBUG, Xlog.AppednerModeAsync, "", logPath, "dbxLog", "68f0b7d5c8a792e1ea94cfc5aaad0db0840282e2b8f5a82f369a996f681c6cd1292f2d6d06712eaf735459584819c4fa71b94f2d9bd53837782ea35aef52ef35");

解码解密的工具是这个:
decode_mars_crypt_log_file.py,mars也有提供,不过不能直接使用,需要将里面的密钥需要进行修改:

更新密钥

需要注意mars提供的这个工具,也是需要python2执行。我有修改成python3的,如果有需要可以看一下:

#!/usr/bin/python

import sys
import os
import glob
import zlib
import struct
import binascii
import pyelliptic
import traceback

MAGIC_NO_COMPRESS_START = 0x03
MAGIC_NO_COMPRESS_START1 = 0x06
MAGIC_NO_COMPRESS_NO_CRYPT_START = 0x08
MAGIC_COMPRESS_START = 0x04
MAGIC_COMPRESS_START1 = 0x05
MAGIC_COMPRESS_START2 = 0x07
MAGIC_COMPRESS_NO_CRYPT_START = 0x09

MAGIC_END = 0x00

lastseq = 0

PRIV_KEY = b'babff40958d0346b8c602dff415e082e94ed5872903ed0ea2a3b198cd3e5d454'
PUB_KEY = b'68f0b7d5c8a792e1ea94cfc5aaad0db0840282e2b8f5a82f369a996f681c6cd1'b'292f2d6d06712eaf735459584819c4fa71b94f2d9bd53837782ea35aef52ef35'


def tea_decipher(v, k):
    op = 0xffffffff
    v0, v1 = struct.unpack('=LL', v[0:8])
    k1, k2, k3, k4 = struct.unpack('=LLLL', k[0:16])
    delta = 0x9E3779B9
    s = (delta << 4) & op
    for i in range(16):
        v1 = (v1 - (((v0 << 4) + k3) ^ (v0 + s) ^ ((v0 >> 5) + k4))) & op
        v0 = (v0 - (((v1 << 4) + k1) ^ (v1 + s) ^ ((v1 >> 5) + k2))) & op
        s = (s - delta) & op
    return struct.pack('=LL', v0, v1)


def tea_decrypt(v, k):
    num = int(len(v) / 8 * 8)
    ret = b''
    for i in range(0, num, 8):
        vi = v[i:i + 8]
        if len(vi) != 8:
            continue
        x = tea_decipher(vi, k)
        ret += x

    ret += v[num:]
    return ret


def IsGoodLogBuffer(_buffer, _offset, count):
    if _offset == len(_buffer): return (True, '')

    magic_start = _buffer[_offset]
    if MAGIC_NO_COMPRESS_START == magic_start or MAGIC_COMPRESS_START == magic_start or MAGIC_COMPRESS_START1 == magic_start:
        crypt_key_len = 4
    elif MAGIC_COMPRESS_START2 == magic_start or MAGIC_NO_COMPRESS_START1 == magic_start or MAGIC_NO_COMPRESS_NO_CRYPT_START == magic_start or MAGIC_COMPRESS_NO_CRYPT_START == magic_start:
        crypt_key_len = 64
    else:
        return False, '_buffer[%d]:%d != MAGIC_NUM_START' % (_offset, _buffer[_offset])

    headerLen = 1 + 2 + 1 + 1 + 4 + crypt_key_len

    if _offset + headerLen + 1 + 1 > len(_buffer):
        return False, 'offset:%d > len(buffer):%d' % (_offset, len(_buffer))
    start = _offset + headerLen - 4 - crypt_key_len
    length = struct.unpack_from("I", memoryview(_buffer)[start:start + 4])[0]
    if _offset + headerLen + length + 1 > len(_buffer):
        return False, 'log length:%d, end pos %d > len(buffer):%d' % (
            length, _offset + headerLen + length + 1, len(_buffer))
    if MAGIC_END != _buffer[_offset + headerLen + length]:
        return False, 'log length:%d, buffer[%d]:%d != MAGIC_END' % (
            length, _offset + headerLen + length, _buffer[_offset + headerLen + length])

    if (1 >= count):
        return (True, '')
    else:
        return IsGoodLogBuffer(_buffer, _offset + headerLen + length + 1, count - 1)


def GetLogStartPos(_buffer, _count):
    offset = 0
    while True:
        if offset >= len(_buffer): break

        if MAGIC_NO_COMPRESS_START == _buffer[offset] or MAGIC_NO_COMPRESS_START1 == _buffer[
            offset] or MAGIC_COMPRESS_START == _buffer[offset] or MAGIC_COMPRESS_START1 == _buffer[
            offset] or MAGIC_COMPRESS_START2 == _buffer[offset] or MAGIC_COMPRESS_NO_CRYPT_START == _buffer[
            offset] or MAGIC_NO_COMPRESS_NO_CRYPT_START == _buffer[offset]:
            if IsGoodLogBuffer(_buffer, offset, _count)[0]: return offset
        offset += 1

    return -1


def DecodeBuffer(_buffer, _offset, _outbuffer):
    if _offset >= len(_buffer):
        return -1
    # if _offset + 1 + 4 + 1 + 1 > len(_buffer): return -1
    ret = IsGoodLogBuffer(_buffer, _offset, 1)
    if not ret[0]:
        fixpos = GetLogStartPos(_buffer[_offset:], 1)
        if -1 == fixpos:
            return -1
        else:
            _outbuffer.extend("[F]decode_log_file.py decode error len=%d, result:%s \n" % (fixpos, ret[1]))
            _offset += fixpos

    magic_start = _buffer[_offset]
    if MAGIC_NO_COMPRESS_START == magic_start or MAGIC_COMPRESS_START == magic_start \
            or MAGIC_COMPRESS_START1 == magic_start:
        crypt_key_len = 4
    elif MAGIC_COMPRESS_START2 == magic_start or MAGIC_NO_COMPRESS_START1 == magic_start \
            or MAGIC_NO_COMPRESS_NO_CRYPT_START == magic_start or MAGIC_COMPRESS_NO_CRYPT_START == magic_start:
        crypt_key_len = 64
    else:
        _outbuffer.extend('in DecodeBuffer _buffer[%d]:%d != MAGIC_NUM_START' % (_offset, magic_start))
        return -1
    headerLen = 1 + 2 + 1 + 1 + 4 + crypt_key_len
    start = _offset + headerLen - 4 - crypt_key_len
    length = struct.unpack_from("I", memoryview(_buffer)[start: start + 4])[0]
    tmpbuffer = bytearray(length)

    seq = struct.unpack_from("H", memoryview(_buffer)[start - 2 - 2:start - 2])[0]
    begin_hour = struct.unpack_from("c", memoryview(_buffer)[start - 1 - 1:start - 1])[0]
    end_hour = struct.unpack_from("c", memoryview(_buffer)[start - 1:start])[0]

    global lastseq
    if seq != 0 and seq != 1 and lastseq != 0 and seq != (lastseq + 1):
        _outbuffer.extend("[F]decode_log_file.py log seq:%d-%d is missing\n" % (lastseq + 1, seq - 1))

    if seq != 0:
        lastseq = seq

    tmpbuffer[:] = _buffer[_offset + headerLen:_offset + headerLen + length]

    try:
        decompressor = zlib.decompressobj(-zlib.MAX_WBITS)
        if MAGIC_NO_COMPRESS_START1 == _buffer[_offset]:
            pass
        elif MAGIC_COMPRESS_START2 == _buffer[_offset]:
            svr = pyelliptic.ECC(curve='secp256k1')
            client = pyelliptic.ECC(curve='secp256k1')
            start = _offset + headerLen - crypt_key_len
            client.pubkey_x = memoryview(_buffer)[start:int(start + crypt_key_len / 2)].tobytes()
            client.pubkey_y = memoryview(_buffer)[int(start + crypt_key_len / 2):start + crypt_key_len].tobytes()

            svr.privkey = binascii.unhexlify(PRIV_KEY)
            tea_key = svr.get_ecdh_key(client.get_pubkey())

            tmpbuffer = tea_decrypt(tmpbuffer, tea_key)
            tmpbuffer = decompressor.decompress(bytes(tmpbuffer))
        elif MAGIC_COMPRESS_START == _buffer[_offset] or MAGIC_COMPRESS_NO_CRYPT_START == _buffer[_offset]:
            tmpbuffer = decompressor.decompress(bytes(tmpbuffer))
        elif MAGIC_COMPRESS_START1 == _buffer[_offset]:
            decompress_data = bytearray()
            while len(tmpbuffer) > 0:
                single_log_len = struct.unpack_from("H", memoryview(tmpbuffer)[0:2])[0]
                decompress_data.extend(tmpbuffer[2:single_log_len + 2])
                tmpbuffer[:] = tmpbuffer[single_log_len + 2:len(tmpbuffer)]

            tmpbuffer = decompressor.decompress(str(decompress_data))

        else:
            pass

            # _outbuffer.extend('seq:%d, hour:%d-%d len:%d decompress:%d\n' %(seq, ord(begin_hour), ord(end_hour), length, len(tmpbuffer)))
    except Exception as e:
        traceback.print_exc()
        _outbuffer.extend("[F]decode_log_file.py decompress err, " + str(e) + "\n")
        return _offset + headerLen + length + 1

    _outbuffer.extend(tmpbuffer)

    return _offset + headerLen + length + 1

使用方式也是一样的:

python decode_mars_crypt_log_file.py dbxLog_20220514.xlog



def ParseFile(_file, _outfile):
    fp = open(_file, "rb")
    _buffer = bytearray(os.path.getsize(_file))
    fp.readinto(_buffer)
    fp.close()
    startpos = GetLogStartPos(_buffer, 2)
    if -1 == startpos:
        return

    outbuffer = bytearray()

    while True:
        startpos = DecodeBuffer(_buffer, startpos, outbuffer)
        if -1 == startpos:
            break

    if 0 == len(outbuffer):
        return

    fpout = open(_outfile, "wb")
    fpout.write(outbuffer)
    fpout.close()


def main(args):
    global lastseq

    if 1 == len(args):
        if os.path.isdir(args[0]):
            filelist = glob.glob(args[0] + "/*.xlog")
            for filepath in filelist:
                lastseq = 0
                ParseFile(filepath, filepath + ".log")
        else:
            ParseFile(args[0], args[0] + ".log")
    elif 2 == len(args):
        ParseFile(args[0], args[1])
    else:
        filelist = glob.glob("*.xlog")
        for filepath in filelist:
            lastseq = 0
            ParseFile(filepath, filepath + ".log")


if __name__ == "__main__":
    main(sys.argv[1:])

文章标签:

本文链接:『转载请注明出处』