aboutsummaryrefslogtreecommitdiff
path: root/migration.c
diff options
context:
space:
mode:
Diffstat (limited to 'migration.c')
-rw-r--r--migration.c551
1 files changed, 551 insertions, 0 deletions
diff --git a/migration.c b/migration.c
new file mode 100644
index 000000000..82c9d670b
--- /dev/null
+++ b/migration.c
@@ -0,0 +1,551 @@
+#include "vl.h"
+#include "qemu_socket.h"
+#include "migration.h"
+
+#define TO_BE_IMPLEMENTED term_printf("%s: TO_BE_IMPLEMENTED\n", __FUNCTION__)
+
+#ifndef CONFIG_USER_ONLY
+
+/* defined in vl.c */
+int parse_host_port(struct sockaddr_in *saddr, const char *str);
+
+#define FD_UNUSED -1
+
+typedef enum {
+ NONE = 0,
+ WRITER = 1,
+ READER = 2
+} migration_role_t;
+
+typedef struct migration_state {
+ int fd;
+#define BUFFSIZE ( 256 * 1024)
+ unsigned char buff[BUFFSIZE]; /* FIXME: allocate dynamically; use mutli/double buffer */
+ unsigned buffsize;
+ unsigned head, tail;
+ migration_role_t role;
+ int64_t head_counter, tail_counter;
+} migration_state_t;
+
+static migration_state_t ms = {
+ .fd = FD_UNUSED,
+ .buff = { 0 },
+ .buffsize = BUFFSIZE,
+ .head = 0,
+ .tail = 0,
+ .head_counter = 0,
+ .tail_counter = 0
+};
+
+static const char *reader_default_addr="localhost:4455";
+static const char *writer_default_addr="localhost:4456";
+
+/* circular buffer functions */
+static int migration_buffer_empty(migration_state_t *pms)
+{
+ return (pms->head == pms->tail);
+}
+
+static int migration_buffer_bytes_filled(migration_state_t *pms)
+{
+ return (pms->head - pms->tail) % pms->buffsize;
+}
+
+static int migration_buffer_bytes_empty(migration_state_t *pms)
+{
+ return (pms->tail - pms->head -1) % pms->buffsize;
+}
+
+static int migration_buffer_bytes_head_end(migration_state_t *pms)
+{
+ return pms->buffsize - pms->head;
+}
+
+static int migration_buffer_bytes_tail_end(migration_state_t *pms)
+{
+ return pms->buffsize - pms->tail;
+}
+
+static void migration_state_inc_head(migration_state_t *pms, int n)
+{
+ pms->head = (pms->head + n) % pms->buffsize;
+ pms->head_counter += n;
+}
+
+static void migration_state_inc_tail(migration_state_t *pms, int n)
+{
+ pms->tail = (pms->tail + n) % pms->buffsize;
+ pms->tail_counter += n;
+}
+
+
+/* create a network address according to arg/default_addr */
+static int parse_host_port_and_message(struct sockaddr_in *saddr,
+ const char *arg,
+ const char *default_addr,
+ const char *name)
+{
+ if (!arg)
+ arg = default_addr;
+ if (parse_host_port(saddr, arg) < 0) {
+ term_printf("%s: invalid argument '%s'", name, arg);
+ return -1;
+ }
+ return 0;
+}
+
+static void migration_cleanup(migration_state_t *pms)
+{
+ if (pms->fd != FD_UNUSED) {
+#ifdef USE_NONBLOCKING_SOCKETS
+ qemu_set_fd_handler(pms->fd, NULL, NULL, NULL);
+#endif
+ close(pms->fd);
+ pms->fd = FD_UNUSED;
+ }
+}
+
+static int migration_read_from_socket(void *opaque)
+{
+ migration_state_t *pms = (migration_state_t *)opaque;
+ int size, toend;
+
+ if (pms->fd == FD_UNUSED) /* not connected */
+ return 0;
+ while (1) { /* breaking if O.K. */
+ size = migration_buffer_bytes_empty(pms); /* available size */
+ toend = migration_buffer_bytes_head_end(pms);
+ if (size > toend) /* read till end of buffer */
+ size = toend;
+ size = read(pms->fd, pms->buff + pms->head, size);
+ if (size < 0) {
+ if (socket_error() == EINTR)
+ continue;
+ term_printf("migration_read_from_socket: read failed (%s)\n", strerror(errno) );
+ return size;
+ }
+ if (size == 0) {
+ /* connection closed */
+ term_printf("migration_read_from_socket: CONNECTION CLOSED\n");
+ migration_cleanup(pms);
+ /* FIXME: call vm_start on A or B according to migration status ? */
+ return size;
+ }
+ else /* we did read something */
+ break;
+ }
+
+ migration_state_inc_head(pms, size);
+ return size;
+}
+
+static int migration_write_into_socket(void *opaque, int len)
+{
+ migration_state_t *pms = (migration_state_t *)opaque;
+ int size, toend;
+
+ if (pms->fd == FD_UNUSED) /* not connected */
+ return 0;
+ while (1) { /* breaking if O.K. */
+ size = migration_buffer_bytes_filled(pms); /* available size */
+ toend = migration_buffer_bytes_tail_end(pms);
+ if (size > toend) /* write till end of buffer */
+ size = toend;
+ if (size > len)
+ size = len;
+ size = write(pms->fd, pms->buff + pms->tail, size);
+ if (size < 0) {
+ if (socket_error() == EINTR)
+ continue;
+ term_printf("migration_write_into_socket: write failed (%s)\n",
+ strerror(socket_error()) );
+ return size;
+ }
+ if (size == 0) {
+ /* connection closed */
+ term_printf("migration_write_into_socket: CONNECTION CLOSED\n");
+ migration_cleanup(pms);
+ return size;
+ }
+ else /* we did write something */
+ break;
+ }
+
+ migration_state_inc_tail(pms, size);
+ return size;
+}
+
+static void migration_accept(void *opaque)
+{
+ migration_state_t *pms = (migration_state_t *)opaque;
+ socklen_t len;
+ struct sockaddr_in sockaddr;
+ int new_fd;
+
+ for(;;) {
+ len = sizeof(sockaddr);
+ new_fd = accept(pms->fd, (struct sockaddr *)&sockaddr, &len);
+ if (new_fd < 0 && errno != EINTR) {
+ term_printf("migration listen: accept failed (%s)\n",
+ strerror(errno));
+ return;
+ } else if (new_fd >= 0) {
+ break;
+ }
+ }
+
+ /* FIXME: Need to be modified if we want to have a control connection
+ * e.g. cancel/abort
+ */
+ migration_cleanup(pms); /* clean old fd */
+ pms->fd = new_fd;
+
+ term_printf("accepted new socket as fd %d\n", pms->fd);
+
+#ifdef USE_NONBLOCKING_SOCKETS
+ /* start handling I/O */
+ qemu_set_fd_handler(pms->fd, migration_read_from_socket, NULL, NULL);
+#else
+ term_printf("waiting for migration to start...\n");
+ do_migration_start("offline");
+#endif
+
+}
+
+
+void do_migration_listen(char *arg1, char *arg2)
+{
+ struct sockaddr_in local, remote;
+ int val;
+
+ if (ms.fd != FD_UNUSED) {
+ term_printf("Already listening or connection established\n");
+ return;
+ }
+
+ ms.role = READER;
+
+ if (parse_host_port_and_message(&local, arg1, reader_default_addr, "migration listen"))
+ return;
+
+ if (parse_host_port_and_message(&remote, arg2, writer_default_addr, "migration listen"))
+ return;
+
+ ms.fd = socket(PF_INET, SOCK_STREAM, 0);
+ if (ms.fd < 0) {
+ term_printf("migration listen: socket() failed (%s)\n",
+ strerror(errno));
+ return;
+ }
+
+ /* fast reuse of address */
+ val = 1;
+ setsockopt(ms.fd, SOL_SOCKET, SO_REUSEADDR, (const char *)&val, sizeof(val));
+
+ if (bind(ms.fd, &local, sizeof local) < 0 ) {
+ migration_cleanup(&ms);
+ term_printf("migration listen: bind() failed (%s)\n", strerror(errno));
+ return;
+ }
+
+ if (listen(ms.fd, 1) < 0) { /* allow only one connection */
+ migration_cleanup(&ms);
+ term_printf("migration listen: listen() failed (%s)\n", strerror(errno));
+ return;
+ }
+
+#ifdef USE_NONBLOCKING_SOCKETS
+ /* FIXME: should I allow BLOCKING socket after vm_stop() to get full bandwidth? */
+ socket_set_nonblock(fd); /* do not block and delay the guest */
+
+ qemu_set_fd_handler(fd, migration_accept, NULL, NULL); /* wait for connect() */
+#else
+ migration_accept(&ms);
+#endif
+}
+
+/* reads from the socket if needed
+ * returns 0 if connection closed
+ * >0 if buffer is not empty, number of bytes filled
+ * <0 if error occure
+ */
+static int migration_read_some(void)
+{
+ int size;
+
+ if (migration_buffer_empty(&ms))
+ size = migration_read_from_socket(&ms);
+ else
+ size = migration_buffer_bytes_filled(&ms);
+ return size;
+}
+
+
+/* returns the byte read or 0 on error/connection closed */
+int migration_read_byte(void)
+{
+ int val = 0;
+
+ if (migration_read_some() > 0) {
+ val = ms.buff[ms.tail];
+ migration_state_inc_tail(&ms, 1);
+ }
+ return val;
+}
+
+/* returns >=0 the number of bytes actually read,
+ * or <0 if error occured
+ */
+int migration_read_buffer(char *buff, int len)
+{
+ int size, toend, len_req = len;
+ while (len > 0) {
+ size = migration_read_some();
+ if (size < 0)
+ return size;
+ else if (size==0)
+ break;
+ toend = migration_buffer_bytes_tail_end(&ms);
+ if (size > toend)
+ size = toend;
+ if (size > len)
+ size = len;
+ memcpy(buff, &ms.buff[ms.tail], size);
+ migration_state_inc_tail(&ms, size);
+ len -= size;
+ buff += size;
+ }
+ return len_req - len;
+}
+
+
+
+/*
+ * buffer the bytes, and send when threshold reached
+ * FIXME: bandwidth control can be implemented here
+ * returns 0 on success, <0 on error
+ */
+static int migration_write_some(int force)
+{
+ int size, threshold = 1024;
+
+ if (threshold >= ms.buffsize) /* if buffsize is small */
+ threshold = ms.buffsize / 2;
+ size = migration_buffer_bytes_filled(&ms);
+ while (size && (force || (size > threshold))) {
+ size = migration_write_into_socket(&ms, size);
+ if (size < 0) /* error */
+ return size;
+ if (size == 0) { /* connection closed -- announce ERROR */
+ term_printf("migration: other side closed connection\n");
+ return -1;
+ }
+ size = migration_buffer_bytes_filled(&ms);
+ }
+ return 0;
+}
+
+static int migration_write_byte(int val)
+{
+ int rc;
+
+ rc = migration_write_some(0);
+ if ( rc == 0) {
+ rc = 1;
+ ms.buff[ms.head] = val;
+ migration_state_inc_head(&ms, 1);
+ }
+ return rc;
+}
+
+static int migration_write_buffer(const char *buff, int len)
+{
+ int size, toend, len_req = len;
+
+ while (len > 0) {
+ if (migration_write_some(0) < 0)
+ break;
+ size = migration_buffer_bytes_empty(&ms);
+ toend = migration_buffer_bytes_head_end(&ms);
+ if (size > toend)
+ size = toend;
+ if (size > len)
+ size = len;
+ memcpy(&ms.buff[ms.head], buff, size);
+ migration_state_inc_head(&ms, size);
+ len -= size;
+ buff += size;
+ }
+ return len_req - len;
+}
+
+void do_migration_connect(char *arg1, char *arg2)
+{
+ struct sockaddr_in local, remote;
+
+ if (ms.fd != FD_UNUSED) {
+ term_printf("Already connecting or connection established\n");
+ return;
+ }
+
+ ms.role = WRITER;
+
+ if (parse_host_port_and_message(&local, arg1, writer_default_addr, "migration connect"))
+ return;
+
+ if (parse_host_port_and_message(&remote, arg2, reader_default_addr, "migration connect"))
+ return;
+
+ ms.fd = socket(PF_INET, SOCK_STREAM, 0);
+ if (ms.fd < 0) {
+ term_printf("migration connect: socket() failed (%s)\n",
+ strerror(errno));
+ return;
+ }
+
+ if (connect(ms.fd, (struct sockaddr*)&remote, sizeof remote) < 0) {
+ term_printf("migration connect: connect() failed (%s)\n",
+ strerror(errno));
+ migration_cleanup(&ms);
+ return;
+ }
+
+ term_printf("migration connect: connected through fd %d\n", ms.fd);
+}
+
+
+void do_migration_getfd(int fd) { TO_BE_IMPLEMENTED; }
+void do_migration_start(char *deadoralive)
+{
+ int rc = -1;
+ const char *dummy = "online_migration";
+
+ switch (ms.role) {
+ case WRITER:
+ rc = qemu_savevm(dummy, &qemu_savevm_method_socket);
+ break;
+ case READER:
+ rc = qemu_loadvm(dummy, &qemu_savevm_method_socket);
+ break;
+ default:
+ term_printf("ERROR: unexpected role=%d\n", ms.role);
+ break;
+ }
+ term_printf("migration %s\n", (rc)?"failed":"completed");
+}
+
+void do_migration_cancel(void)
+{
+ migration_cleanup(&ms);
+}
+void do_migration_status(void){ TO_BE_IMPLEMENTED; }
+void do_migration_set(char *fmt, ...){ TO_BE_IMPLEMENTED; }
+void do_migration_show(void){ TO_BE_IMPLEMENTED; }
+
+
+
+/*
+ * =============================================
+ * qemu_savevm_method implementation for sockets
+ * =============================================
+ */
+static int qemu_savevm_method_socket_open(QEMUFile *f, const char *filename,
+ const char *flags)
+{
+ if (ms.fd == FD_UNUSED)
+ return -1;
+ f->opaque = (void*)&ms;
+ return 0;
+}
+
+static void qemu_savevm_method_socket_close(QEMUFile *f)
+{
+ migration_state_t *pms = (migration_state_t*)f->opaque;
+ if (pms->role == WRITER) {
+ migration_write_some(1); /* sync */
+ }
+}
+
+static void qemu_savevm_method_socket_put_buffer(QEMUFile *f, const uint8_t *buf, int size)
+{
+ migration_write_buffer(buf, size);
+}
+
+static void qemu_savevm_method_socket_put_byte(QEMUFile *f, int v)
+{
+ migration_write_byte(v);
+}
+
+static int qemu_savevm_method_socket_get_buffer(QEMUFile *f, uint8_t *buf, int size)
+{
+ return migration_read_buffer(buf, size);
+}
+
+static int qemu_savevm_method_socket_get_byte(QEMUFile *f)
+{
+ return migration_read_byte();
+}
+
+static int64_t qemu_savevm_method_socket_tell(QEMUFile *f)
+{
+ migration_state_t *pms = (migration_state_t*)f->opaque;
+ int64_t cnt=-1;
+ if (pms->role == WRITER)
+ cnt = pms->head_counter;
+ else if (pms->role == READER)
+ cnt = pms->tail_counter;
+ return cnt;
+}
+
+/*
+ * hack alert: written to overcome a weakness of our solution (not yet generic).
+ * READER: read 4 bytes of the actual length (and maybe do something)
+ * WRITER: ignore (or maybe do something)
+ */
+static int64_t qemu_savevm_method_socket_seek(QEMUFile *f, int64_t pos, int whence)
+{
+ migration_state_t *pms = (migration_state_t*)f->opaque;
+ unsigned int record_len;
+
+ if (pms->role == READER) {
+ record_len = qemu_get_be32(f);
+ }
+ return 0;
+}
+
+static int qemu_savevm_method_socket_eof(QEMUFile *f)
+{
+ migration_state_t *pms = (migration_state_t*)f->opaque;
+
+ return (pms->fd == FD_UNUSED);
+}
+
+QEMUFile qemu_savevm_method_socket = {
+ .opaque = NULL,
+ .open = qemu_savevm_method_socket_open,
+ .close = qemu_savevm_method_socket_close,
+ .put_byte = qemu_savevm_method_socket_put_byte,
+ .get_byte = qemu_savevm_method_socket_get_byte,
+ .put_buffer = qemu_savevm_method_socket_put_buffer,
+ .get_buffer = qemu_savevm_method_socket_get_buffer,
+ .tell = qemu_savevm_method_socket_tell,
+ .seek = qemu_savevm_method_socket_seek,
+ .eof = qemu_savevm_method_socket_eof
+};
+
+
+
+
+
+#else /* CONFIG_USER_ONLY is defined */
+
+void do_migration_listen(char *arg1, char *arg2) { TO_BE_IMPLEMENTED; }
+void do_migration_connect(char *arg1, char *arg2) { TO_BE_IMPLEMENTED; }
+void do_migration_getfd(int fd) { TO_BE_IMPLEMENTED; }
+void do_migration_start(char *deadoralive) { TO_BE_IMPLEMENTED; }
+void do_migration_cancel(void){ TO_BE_IMPLEMENTED; }
+void do_migration_status(void){ TO_BE_IMPLEMENTED; }
+void do_migration_set(char *fmt, ...){ TO_BE_IMPLEMENTED; }
+void do_migration_show(void){ TO_BE_IMPLEMENTED; }
+
+#endif /* of CONFIG_USER_ONLY is defined */