18 Star 26 Fork 104

src-openEuler / qemu

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
migration-Add-multi-thread-compress-ops.patch 13.40 KB
一键复制 编辑 原始数据 按行查看 历史
Jiabo Feng 提交于 2024-04-07 10:21 . QEMU update to version 8.2.0-5
From 5896dedf32c7e4417bd7f3e889ca41a34b06f5db Mon Sep 17 00:00:00 2001
From: Chuan Zheng <zhengchuan@huawei.com>
Date: Sat, 30 Jan 2021 15:57:31 +0800
Subject: [PATCH] migration: Add multi-thread compress ops
Add the MigrationCompressOps and MigrationDecompressOps structures to make
the compression method configurable for multi-thread compression migration.
Signed-off-by: Chuan Zheng <zhengchuan@huawei.com>
Signed-off-by: Zeyu Jin <jinzeyu@huawei.com>
Signed-off-by: Ying Fang <fangying1@huawei.com>
---
migration/options.c | 9 ++
migration/options.h | 1 +
migration/ram-compress.c | 261 ++++++++++++++++++++++++++-------------
migration/ram-compress.h | 31 ++++-
migration/ram.c | 4 +-
5 files changed, 215 insertions(+), 91 deletions(-)
diff --git a/migration/options.c b/migration/options.c
index af7ea7b346..6aaee702dc 100644
--- a/migration/options.c
+++ b/migration/options.c
@@ -799,6 +799,15 @@ int migrate_decompress_threads(void)
return s->parameters.decompress_threads;
}
+CompressMethod migrate_compress_method(void)
+{
+ MigrationState *s;
+
+ s = migrate_get_current();
+
+ return s->parameters.compress_method;
+}
+
uint64_t migrate_downtime_limit(void)
{
MigrationState *s = migrate_get_current();
diff --git a/migration/options.h b/migration/options.h
index 246c160aee..9aca5e41ad 100644
--- a/migration/options.h
+++ b/migration/options.h
@@ -78,6 +78,7 @@ uint8_t migrate_cpu_throttle_increment(void);
uint8_t migrate_cpu_throttle_initial(void);
bool migrate_cpu_throttle_tailslow(void);
int migrate_decompress_threads(void);
+CompressMethod migrate_compress_method(void);
uint64_t migrate_downtime_limit(void);
uint8_t migrate_max_cpu_throttle(void);
uint64_t migrate_max_bandwidth(void);
diff --git a/migration/ram-compress.c b/migration/ram-compress.c
index 2be344acbc..6e37b22492 100644
--- a/migration/ram-compress.c
+++ b/migration/ram-compress.c
@@ -65,26 +65,167 @@ static QemuThread *compress_threads;
static QemuMutex comp_done_lock;
static QemuCond comp_done_cond;
-struct DecompressParam {
- bool done;
- bool quit;
- QemuMutex mutex;
- QemuCond cond;
- void *des;
- uint8_t *compbuf;
- int len;
- z_stream stream;
-};
-typedef struct DecompressParam DecompressParam;
-
static QEMUFile *decomp_file;
static DecompressParam *decomp_param;
static QemuThread *decompress_threads;
+MigrationCompressOps *compress_ops;
+MigrationDecompressOps *decompress_ops;
static QemuMutex decomp_done_lock;
static QemuCond decomp_done_cond;
static CompressResult do_compress_ram_page(CompressParam *param, RAMBlock *block);
+static int zlib_save_setup(CompressParam *param)
+{
+ if (deflateInit(&param->stream,
+ migrate_compress_level()) != Z_OK) {
+ return -1;
+ }
+
+ return 0;
+}
+
+static ssize_t zlib_compress_data(CompressParam *param, size_t size)
+{
+ int err;
+ uint8_t *dest = NULL;
+ z_stream *stream = &param->stream;
+ uint8_t *p = param->originbuf;
+ QEMUFile *f = f = param->file;
+ ssize_t blen = qemu_put_compress_start(f, &dest);
+
+ if (blen < compressBound(size)) {
+ return -1;
+ }
+
+ err = deflateReset(stream);
+ if (err != Z_OK) {
+ return -1;
+ }
+
+ stream->avail_in = size;
+ stream->next_in = p;
+ stream->avail_out = blen;
+ stream->next_out = dest;
+
+ err = deflate(stream, Z_FINISH);
+ if (err != Z_STREAM_END) {
+ return -1;
+ }
+
+ blen = stream->next_out - dest;
+ if (blen < 0) {
+ return -1;
+ }
+
+ qemu_put_compress_end(f, blen);
+ return blen + sizeof(int32_t);
+}
+
+static void zlib_save_cleanup(CompressParam *param)
+{
+ deflateEnd(&param->stream);
+}
+
+static int zlib_load_setup(DecompressParam *param)
+{
+ if (inflateInit(&param->stream) != Z_OK) {
+ return -1;
+ }
+
+ return 0;
+}
+
+static int
+zlib_decompress_data(DecompressParam *param, uint8_t *dest, size_t size)
+{
+ int err;
+
+ z_stream *stream = &param->stream;
+
+ err = inflateReset(stream);
+ if (err != Z_OK) {
+ return -1;
+ }
+
+ stream->avail_in = param->len;
+ stream->next_in = param->compbuf;
+ stream->avail_out = size;
+ stream->next_out = dest;
+
+ err = inflate(stream, Z_NO_FLUSH);
+ if (err != Z_STREAM_END) {
+ return -1;
+ }
+
+ return stream->total_out;
+}
+
+static void zlib_load_cleanup(DecompressParam *param)
+{
+ inflateEnd(&param->stream);
+}
+
+static int zlib_check_len(int len)
+{
+ return len < 0 || len > compressBound(TARGET_PAGE_SIZE);
+}
+
+static int set_compress_ops(void)
+{
+ compress_ops = g_new0(MigrationCompressOps, 1);
+
+ switch (migrate_compress_method()) {
+ case COMPRESS_METHOD_ZLIB:
+ compress_ops->save_setup = zlib_save_setup;
+ compress_ops->save_cleanup = zlib_save_cleanup;
+ compress_ops->compress_data = zlib_compress_data;
+ break;
+ default:
+ return -1;
+ }
+
+ return 0;
+}
+
+static int set_decompress_ops(void)
+{
+ decompress_ops = g_new0(MigrationDecompressOps, 1);
+
+ switch (migrate_compress_method()) {
+ case COMPRESS_METHOD_ZLIB:
+ decompress_ops->load_setup = zlib_load_setup;
+ decompress_ops->load_cleanup = zlib_load_cleanup;
+ decompress_ops->decompress_data = zlib_decompress_data;
+ decompress_ops->check_len = zlib_check_len;
+ break;
+ default:
+ return -1;
+ }
+
+ return 0;
+}
+
+static void clean_compress_ops(void)
+{
+ compress_ops->save_setup = NULL;
+ compress_ops->save_cleanup = NULL;
+ compress_ops->compress_data = NULL;
+
+ g_free(compress_ops);
+ compress_ops = NULL;
+}
+
+static void clean_decompress_ops(void)
+{
+ decompress_ops->load_setup = NULL;
+ decompress_ops->load_cleanup = NULL;
+ decompress_ops->decompress_data = NULL;
+
+ g_free(decompress_ops);
+ decompress_ops = NULL;
+}
+
static void *do_data_compress(void *opaque)
{
CompressParam *param = opaque;
@@ -141,7 +282,7 @@ void compress_threads_save_cleanup(void)
qemu_thread_join(compress_threads + i);
qemu_mutex_destroy(&comp_param[i].mutex);
qemu_cond_destroy(&comp_param[i].cond);
- deflateEnd(&comp_param[i].stream);
+ compress_ops->save_cleanup(&comp_param[i]);
g_free(comp_param[i].originbuf);
qemu_fclose(comp_param[i].file);
comp_param[i].file = NULL;
@@ -152,6 +293,7 @@ void compress_threads_save_cleanup(void)
g_free(comp_param);
compress_threads = NULL;
comp_param = NULL;
+ clean_compress_ops();
}
int compress_threads_save_setup(void)
@@ -161,6 +303,12 @@ int compress_threads_save_setup(void)
if (!migrate_compress()) {
return 0;
}
+
+ if (set_compress_ops() < 0) {
+ clean_compress_ops();
+ return -1;
+ }
+
thread_count = migrate_compress_threads();
compress_threads = g_new0(QemuThread, thread_count);
comp_param = g_new0(CompressParam, thread_count);
@@ -172,8 +320,7 @@ int compress_threads_save_setup(void)
goto exit;
}
- if (deflateInit(&comp_param[i].stream,
- migrate_compress_level()) != Z_OK) {
+ if (compress_ops->save_setup(&comp_param[i]) < 0) {
g_free(comp_param[i].originbuf);
goto exit;
}
@@ -198,50 +345,6 @@ exit:
return -1;
}
-/*
- * Compress size bytes of data start at p and store the compressed
- * data to the buffer of f.
- *
- * Since the file is dummy file with empty_ops, return -1 if f has no space to
- * save the compressed data.
- */
-static ssize_t qemu_put_compression_data(CompressParam *param, size_t size)
-{
- int err;
- uint8_t *dest = NULL;
- z_stream *stream = &param->stream;
- uint8_t *p = param->originbuf;
- QEMUFile *f = f = param->file;
- ssize_t blen = qemu_put_compress_start(f, &dest);
-
- if (blen < compressBound(size)) {
- return -1;
- }
-
- err = deflateReset(stream);
- if (err != Z_OK) {
- return -1;
- }
-
- stream->avail_in = size;
- stream->next_in = p;
- stream->avail_out = blen;
- stream->next_out = dest;
-
- err = deflate(stream, Z_FINISH);
- if (err != Z_STREAM_END) {
- return -1;
- }
-
- blen = stream->next_out - dest;
- if (blen < 0) {
- return -1;
- }
-
- qemu_put_compress_end(f, blen);
- return blen + sizeof(int32_t);
-}
-
static CompressResult do_compress_ram_page(CompressParam *param, RAMBlock *block)
{
uint8_t *p = block->host + (param->offset & TARGET_PAGE_MASK);
@@ -260,7 +363,7 @@ static CompressResult do_compress_ram_page(CompressParam *param, RAMBlock *block
* decompression
*/
memcpy(param->originbuf, p, page_size);
- ret = qemu_put_compression_data(param, page_size);
+ ret = compress_ops->compress_data(param, page_size);
if (ret < 0) {
qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
error_report("compressed data failed!");
@@ -356,32 +459,6 @@ bool compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset,
}
}
-/* return the size after decompression, or negative value on error */
-static int
-qemu_uncompress_data(DecompressParam *param, uint8_t *dest, size_t pagesize)
-{
- int err;
-
- z_stream *stream = &param->stream;
-
- err = inflateReset(stream);
- if (err != Z_OK) {
- return -1;
- }
-
- stream->avail_in = param->len;
- stream->next_in = param->compbuf;
- stream->avail_out = pagesize;
- stream->next_out = dest;
-
- err = inflate(stream, Z_NO_FLUSH);
- if (err != Z_STREAM_END) {
- return -1;
- }
-
- return stream->total_out;
-}
-
static void *do_data_decompress(void *opaque)
{
DecompressParam *param = opaque;
@@ -398,7 +475,7 @@ static void *do_data_decompress(void *opaque)
pagesize = qemu_target_page_size();
- ret = qemu_uncompress_data(param, des, pagesize);
+ ret = decompress_ops->decompress_data(param, des, pagesize);
if (ret < 0 && migrate_get_current()->decompress_error_check) {
error_report("decompress data failed");
qemu_file_set_error(decomp_file, ret);
@@ -466,7 +543,7 @@ void compress_threads_load_cleanup(void)
qemu_thread_join(decompress_threads + i);
qemu_mutex_destroy(&decomp_param[i].mutex);
qemu_cond_destroy(&decomp_param[i].cond);
- inflateEnd(&decomp_param[i].stream);
+ decompress_ops->load_cleanup(&decomp_param[i]);
g_free(decomp_param[i].compbuf);
decomp_param[i].compbuf = NULL;
}
@@ -475,6 +552,7 @@ void compress_threads_load_cleanup(void)
decompress_threads = NULL;
decomp_param = NULL;
decomp_file = NULL;
+ clean_decompress_ops();
}
int compress_threads_load_setup(QEMUFile *f)
@@ -485,6 +563,11 @@ int compress_threads_load_setup(QEMUFile *f)
return 0;
}
+ if (set_decompress_ops() < 0) {
+ clean_decompress_ops();
+ return -1;
+ }
+
/*
* set compression_counters memory to zero for a new migration
*/
@@ -497,7 +580,7 @@ int compress_threads_load_setup(QEMUFile *f)
qemu_cond_init(&decomp_done_cond);
decomp_file = f;
for (i = 0; i < thread_count; i++) {
- if (inflateInit(&decomp_param[i].stream) != Z_OK) {
+ if (decompress_ops->load_setup(&decomp_param[i]) < 0) {
goto exit;
}
diff --git a/migration/ram-compress.h b/migration/ram-compress.h
index 0d89a2f55e..daf241987f 100644
--- a/migration/ram-compress.h
+++ b/migration/ram-compress.h
@@ -39,6 +39,20 @@ enum CompressResult {
};
typedef enum CompressResult CompressResult;
+struct DecompressParam {
+ bool done;
+ bool quit;
+ QemuMutex mutex;
+ QemuCond cond;
+ void *des;
+ uint8_t *compbuf;
+ int len;
+
+ /* for zlib compression */
+ z_stream stream;
+};
+typedef struct DecompressParam DecompressParam;
+
struct CompressParam {
bool done;
bool quit;
@@ -51,11 +65,26 @@ struct CompressParam {
ram_addr_t offset;
/* internally used fields */
- z_stream stream;
uint8_t *originbuf;
+
+ /* for zlib compression */
+ z_stream stream;
};
typedef struct CompressParam CompressParam;
+typedef struct {
+ int (*save_setup)(CompressParam *param);
+ void (*save_cleanup)(CompressParam *param);
+ ssize_t (*compress_data)(CompressParam *param, size_t size);
+} MigrationCompressOps;
+
+typedef struct {
+ int (*load_setup)(DecompressParam *param);
+ void (*load_cleanup)(DecompressParam *param);
+ int (*decompress_data)(DecompressParam *param, uint8_t *dest, size_t size);
+ int (*check_len)(int len);
+} MigrationDecompressOps;
+
void compress_threads_save_cleanup(void);
int compress_threads_save_setup(void);
diff --git a/migration/ram.c b/migration/ram.c
index 8c7886ab79..f9b2b9b985 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -96,6 +96,8 @@
XBZRLECacheStats xbzrle_counters;
+extern MigrationDecompressOps *decompress_ops;
+
/* used by the search for pages to send */
struct PageSearchStatus {
/* The migration channel used for a specific host page */
@@ -3979,7 +3981,7 @@ static int ram_load_precopy(QEMUFile *f)
case RAM_SAVE_FLAG_COMPRESS_PAGE:
len = qemu_get_be32(f);
- if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
+ if (decompress_ops->check_len(len)) {
error_report("Invalid compressed data length: %d", len);
ret = -EINVAL;
break;
--
2.27.0
1
https://gitee.com/src-openeuler/qemu.git
git@gitee.com:src-openeuler/qemu.git
src-openeuler
qemu
qemu
master

搜索帮助