Loading sql/ha_ndbcluster_binlog.cc +62 −52 Original line number Diff line number Diff line Loading @@ -804,7 +804,7 @@ void ndbcluster_setup_binlog_table_shares(THD *thd) #define SCHEMA_SLOCK_SIZE 32u #define SCHEMA_QUERY_SIZE 4096u struct Cluster_replication_schema struct Cluster_schema { unsigned char db_length; char db[64]; Loading @@ -825,7 +825,7 @@ struct Cluster_replication_schema Transfer schema table data into corresponding struct */ static void ndbcluster_get_schema(TABLE *table, Cluster_replication_schema *s) Cluster_schema *s) { Field **field; /* db varchar 1 length byte */ Loading Loading @@ -1153,7 +1153,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, /* remove any unsubscribed from share->slock */ bitmap_intersect(&share->slock_bitmap, &schema_subscribers); DBUG_DUMP("share->slock_bitmap.bitmap", (char*)share->slock_bitmap.bitmap, DBUG_DUMP("share->slock_bitmap.bitmap", (char*)share->slock_bitmap.bitmap, no_bytes_in_map(&share->slock_bitmap)); if (bitmap_is_clear_all(&share->slock_bitmap)) Loading Loading @@ -1484,9 +1485,9 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, static int ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, NdbEventOperation *pOp, List<Cluster_replication_schema> List<Cluster_schema> *post_epoch_log_list, List<Cluster_replication_schema> List<Cluster_schema> *post_epoch_unlock_list, MEM_ROOT *mem_root) { Loading @@ -1497,48 +1498,51 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, NDBEVENT::TableEvent ev_type= pOp->getEventType(); DBUG_PRINT("enter", ("%s.%s ev_type: %d", share->db, share->table_name, ev_type)); switch (ev_type) { case NDBEVENT::TE_UPDATE: /* fall through */ case NDBEVENT::TE_INSERT: if (ev_type == NDBEVENT::TE_UPDATE || ev_type == NDBEVENT::TE_INSERT) { Cluster_replication_schema *schema= (Cluster_replication_schema *) sql_alloc(sizeof(Cluster_replication_schema)); Cluster_schema *schema= (Cluster_schema *) sql_alloc(sizeof(Cluster_schema)); MY_BITMAP slock; bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, false); uint node_id= g_ndb_cluster_connection->node_id(); ndbcluster_get_schema(share->table, schema); if (schema->node_id != node_id) { int log_query= 0; int log_query= 0, post_epoch_unlock= 0; DBUG_PRINT("info", ("log query_length: %d query: '%s'", schema->query_length, schema->query)); char key[FN_REFLEN]; build_table_filename(key, sizeof(key), schema->db, schema->name, ""); NDB_SHARE *share= get_share(key, 0, false, false); switch ((enum SCHEMA_OP_TYPE)schema->type) { case SOT_DROP_TABLE: /* binlog dropping table after any table operations */ if (share && share->op) { post_epoch_log_list->push_back(schema, mem_root); /* acknowledge this query _after_ epoch completion */ post_epoch_unlock= 1; } /* table is either ignored or logging is postponed to later */ log_query= 0; break; case SOT_RENAME_TABLE: if (share && share->op) { log_query= 0; post_epoch_log_list->push_back(schema, mem_root); /* acknowledge this query _after_ epoch completion */ post_epoch_unlock= 1; break; /* discovery will be handled by binlog */ } goto sot_create_table; case SOT_ALTER_TABLE: if (share && share->op) { log_query= 0; post_epoch_log_list->push_back(schema, mem_root); /* acknowledge this query _after_ epoch completion */ post_epoch_unlock= 1; break; /* discovery will be handled by binlog */ } goto sot_create_table; Loading Loading @@ -1571,8 +1575,11 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, TRUE); /* don't binlog the query */ /* binlog dropping database after any table operations */ if (ndb_binlog_running) { post_epoch_log_list->push_back(schema, mem_root); log_query= 0; /* acknowledge this query _after_ epoch completion */ post_epoch_unlock= 1; } break; case SOT_CREATE_DB: /* fall through */ Loading @@ -1581,7 +1588,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, schema->query + schema->query_length, TRUE, /* print error */ FALSE); /* binlog the query */ log_query= 0; break; case SOT_CLEAR_SLOCK: { Loading Loading @@ -1609,25 +1615,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, free_share(&share); share= 0; } /* signal that schema operation has been handled */ if ((enum SCHEMA_OP_TYPE)schema->type != SOT_CLEAR_SLOCK) { DBUG_DUMP("slock", (char*)schema->slock, schema->slock_length); if (bitmap_is_set(&slock, node_id)) { /* If it is an SOT_ALTER_TABLE we need to acknowledge the schema operation _after_ all the events have been processed so that all schema events coming through the event operation has been processed */ if ((enum SCHEMA_OP_TYPE)schema->type == SOT_ALTER_TABLE) post_epoch_unlock_list->push_back(schema, mem_root); else ndbcluster_update_slock(thd, schema->db, schema->name); } } if (log_query && ndb_binlog_running) { char *thd_db_save= thd->db; Loading @@ -1637,9 +1624,23 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, schema->name[0] == 0 || thd->db[0] == 0); thd->db= thd_db_save; } /* signal that schema operation has been handled */ DBUG_DUMP("slock", (char*)schema->slock, schema->slock_length); if (bitmap_is_set(&slock, node_id)) { if (post_epoch_unlock) post_epoch_unlock_list->push_back(schema, mem_root); else ndbcluster_update_slock(thd, schema->db, schema->name); } } break; DBUG_RETURN(0); } /* the normal case of UPDATE/INSERT has already been handled */ switch (ev_type) { case NDBEVENT::TE_DELETE: // skip break; Loading Loading @@ -1726,13 +1727,13 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, */ static void ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd, List<Cluster_replication_schema> List<Cluster_schema> *post_epoch_log_list, List<Cluster_replication_schema> List<Cluster_schema> *post_epoch_unlock_list) { DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch"); Cluster_replication_schema *schema; Cluster_schema *schema; while ((schema= post_epoch_log_list->pop())) { DBUG_PRINT("info", ("log query_length: %d query: '%s'", Loading Loading @@ -2120,7 +2121,8 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, } if (share->flags & NSF_NO_BINLOG) { DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x %d", share->flags, share->flags & NSF_NO_BINLOG)); DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x %d", share->flags, share->flags & NSF_NO_BINLOG)); DBUG_RETURN(0); } Loading @@ -2137,7 +2139,8 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, share->key); if (push_warning) push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, ER_ILLEGAL_HA_CREATE_OPTION, ER(ER_ILLEGAL_HA_CREATE_OPTION), ER_ILLEGAL_HA_CREATE_OPTION, ER(ER_ILLEGAL_HA_CREATE_OPTION), ndbcluster_hton.name, "Binlog of table with BLOB attribute and no PK"); Loading Loading @@ -2268,7 +2271,8 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, if (share->flags & NSF_NO_BINLOG) { DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x", share->flags)); DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x", share->flags)); DBUG_RETURN(0); } Loading Loading @@ -2690,7 +2694,8 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, { case NDBEVENT::TE_INSERT: row.n_inserts++; DBUG_PRINT("info", ("INSERT INTO %s.%s", table_s->db.str, table_s->table_name.str)); DBUG_PRINT("info", ("INSERT INTO %s.%s", table_s->db.str, table_s->table_name.str)); { if (share->flags & NSF_BLOB_FLAG) { Loading @@ -2701,14 +2706,16 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, DBUG_ASSERT(ret == 0); } ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]); int ret= trans.write_row(::server_id, injector::transaction::table(table, true), int ret= trans.write_row(::server_id, injector::transaction::table(table, true), &b, n_fields, table->record[0]); DBUG_ASSERT(ret == 0); } break; case NDBEVENT::TE_DELETE: row.n_deletes++; DBUG_PRINT("info",("DELETE FROM %s.%s", table_s->db.str, table_s->table_name.str)); DBUG_PRINT("info",("DELETE FROM %s.%s", table_s->db.str, table_s->table_name.str)); { /* table->record[0] contains only the primary key in this case Loading Loading @@ -2737,14 +2744,16 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, } ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]); DBUG_EXECUTE("info", print_records(table, table->record[n]);); int ret= trans.delete_row(::server_id, injector::transaction::table(table, true), int ret= trans.delete_row(::server_id, injector::transaction::table(table, true), &b, n_fields, table->record[n]); DBUG_ASSERT(ret == 0); } break; case NDBEVENT::TE_UPDATE: row.n_updates++; DBUG_PRINT("info", ("UPDATE %s.%s", table_s->db.str, table_s->table_name.str)); DBUG_PRINT("info", ("UPDATE %s.%s", table_s->db.str, table_s->table_name.str)); { if (share->flags & NSF_BLOB_FLAG) { Loading Loading @@ -3025,15 +3034,16 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) MEM_ROOT *old_root= *root_ptr; MEM_ROOT mem_root; init_sql_alloc(&mem_root, 4096, 0); List<Cluster_replication_schema> post_epoch_log_list; List<Cluster_replication_schema> post_epoch_unlock_list; List<Cluster_schema> post_epoch_log_list; List<Cluster_schema> post_epoch_unlock_list; *root_ptr= &mem_root; if (unlikely(schema_res > 0)) { schema_ndb-> setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip); schema_ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); schema_ndb-> setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); NdbEventOperation *pOp= schema_ndb->nextEvent(); while (pOp != NULL) { Loading Loading
sql/ha_ndbcluster_binlog.cc +62 −52 Original line number Diff line number Diff line Loading @@ -804,7 +804,7 @@ void ndbcluster_setup_binlog_table_shares(THD *thd) #define SCHEMA_SLOCK_SIZE 32u #define SCHEMA_QUERY_SIZE 4096u struct Cluster_replication_schema struct Cluster_schema { unsigned char db_length; char db[64]; Loading @@ -825,7 +825,7 @@ struct Cluster_replication_schema Transfer schema table data into corresponding struct */ static void ndbcluster_get_schema(TABLE *table, Cluster_replication_schema *s) Cluster_schema *s) { Field **field; /* db varchar 1 length byte */ Loading Loading @@ -1153,7 +1153,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, /* remove any unsubscribed from share->slock */ bitmap_intersect(&share->slock_bitmap, &schema_subscribers); DBUG_DUMP("share->slock_bitmap.bitmap", (char*)share->slock_bitmap.bitmap, DBUG_DUMP("share->slock_bitmap.bitmap", (char*)share->slock_bitmap.bitmap, no_bytes_in_map(&share->slock_bitmap)); if (bitmap_is_clear_all(&share->slock_bitmap)) Loading Loading @@ -1484,9 +1485,9 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, static int ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, NdbEventOperation *pOp, List<Cluster_replication_schema> List<Cluster_schema> *post_epoch_log_list, List<Cluster_replication_schema> List<Cluster_schema> *post_epoch_unlock_list, MEM_ROOT *mem_root) { Loading @@ -1497,48 +1498,51 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, NDBEVENT::TableEvent ev_type= pOp->getEventType(); DBUG_PRINT("enter", ("%s.%s ev_type: %d", share->db, share->table_name, ev_type)); switch (ev_type) { case NDBEVENT::TE_UPDATE: /* fall through */ case NDBEVENT::TE_INSERT: if (ev_type == NDBEVENT::TE_UPDATE || ev_type == NDBEVENT::TE_INSERT) { Cluster_replication_schema *schema= (Cluster_replication_schema *) sql_alloc(sizeof(Cluster_replication_schema)); Cluster_schema *schema= (Cluster_schema *) sql_alloc(sizeof(Cluster_schema)); MY_BITMAP slock; bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, false); uint node_id= g_ndb_cluster_connection->node_id(); ndbcluster_get_schema(share->table, schema); if (schema->node_id != node_id) { int log_query= 0; int log_query= 0, post_epoch_unlock= 0; DBUG_PRINT("info", ("log query_length: %d query: '%s'", schema->query_length, schema->query)); char key[FN_REFLEN]; build_table_filename(key, sizeof(key), schema->db, schema->name, ""); NDB_SHARE *share= get_share(key, 0, false, false); switch ((enum SCHEMA_OP_TYPE)schema->type) { case SOT_DROP_TABLE: /* binlog dropping table after any table operations */ if (share && share->op) { post_epoch_log_list->push_back(schema, mem_root); /* acknowledge this query _after_ epoch completion */ post_epoch_unlock= 1; } /* table is either ignored or logging is postponed to later */ log_query= 0; break; case SOT_RENAME_TABLE: if (share && share->op) { log_query= 0; post_epoch_log_list->push_back(schema, mem_root); /* acknowledge this query _after_ epoch completion */ post_epoch_unlock= 1; break; /* discovery will be handled by binlog */ } goto sot_create_table; case SOT_ALTER_TABLE: if (share && share->op) { log_query= 0; post_epoch_log_list->push_back(schema, mem_root); /* acknowledge this query _after_ epoch completion */ post_epoch_unlock= 1; break; /* discovery will be handled by binlog */ } goto sot_create_table; Loading Loading @@ -1571,8 +1575,11 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, TRUE); /* don't binlog the query */ /* binlog dropping database after any table operations */ if (ndb_binlog_running) { post_epoch_log_list->push_back(schema, mem_root); log_query= 0; /* acknowledge this query _after_ epoch completion */ post_epoch_unlock= 1; } break; case SOT_CREATE_DB: /* fall through */ Loading @@ -1581,7 +1588,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, schema->query + schema->query_length, TRUE, /* print error */ FALSE); /* binlog the query */ log_query= 0; break; case SOT_CLEAR_SLOCK: { Loading Loading @@ -1609,25 +1615,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, free_share(&share); share= 0; } /* signal that schema operation has been handled */ if ((enum SCHEMA_OP_TYPE)schema->type != SOT_CLEAR_SLOCK) { DBUG_DUMP("slock", (char*)schema->slock, schema->slock_length); if (bitmap_is_set(&slock, node_id)) { /* If it is an SOT_ALTER_TABLE we need to acknowledge the schema operation _after_ all the events have been processed so that all schema events coming through the event operation has been processed */ if ((enum SCHEMA_OP_TYPE)schema->type == SOT_ALTER_TABLE) post_epoch_unlock_list->push_back(schema, mem_root); else ndbcluster_update_slock(thd, schema->db, schema->name); } } if (log_query && ndb_binlog_running) { char *thd_db_save= thd->db; Loading @@ -1637,9 +1624,23 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, schema->name[0] == 0 || thd->db[0] == 0); thd->db= thd_db_save; } /* signal that schema operation has been handled */ DBUG_DUMP("slock", (char*)schema->slock, schema->slock_length); if (bitmap_is_set(&slock, node_id)) { if (post_epoch_unlock) post_epoch_unlock_list->push_back(schema, mem_root); else ndbcluster_update_slock(thd, schema->db, schema->name); } } break; DBUG_RETURN(0); } /* the normal case of UPDATE/INSERT has already been handled */ switch (ev_type) { case NDBEVENT::TE_DELETE: // skip break; Loading Loading @@ -1726,13 +1727,13 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, */ static void ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd, List<Cluster_replication_schema> List<Cluster_schema> *post_epoch_log_list, List<Cluster_replication_schema> List<Cluster_schema> *post_epoch_unlock_list) { DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch"); Cluster_replication_schema *schema; Cluster_schema *schema; while ((schema= post_epoch_log_list->pop())) { DBUG_PRINT("info", ("log query_length: %d query: '%s'", Loading Loading @@ -2120,7 +2121,8 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, } if (share->flags & NSF_NO_BINLOG) { DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x %d", share->flags, share->flags & NSF_NO_BINLOG)); DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x %d", share->flags, share->flags & NSF_NO_BINLOG)); DBUG_RETURN(0); } Loading @@ -2137,7 +2139,8 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, share->key); if (push_warning) push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, ER_ILLEGAL_HA_CREATE_OPTION, ER(ER_ILLEGAL_HA_CREATE_OPTION), ER_ILLEGAL_HA_CREATE_OPTION, ER(ER_ILLEGAL_HA_CREATE_OPTION), ndbcluster_hton.name, "Binlog of table with BLOB attribute and no PK"); Loading Loading @@ -2268,7 +2271,8 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, if (share->flags & NSF_NO_BINLOG) { DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x", share->flags)); DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x", share->flags)); DBUG_RETURN(0); } Loading Loading @@ -2690,7 +2694,8 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, { case NDBEVENT::TE_INSERT: row.n_inserts++; DBUG_PRINT("info", ("INSERT INTO %s.%s", table_s->db.str, table_s->table_name.str)); DBUG_PRINT("info", ("INSERT INTO %s.%s", table_s->db.str, table_s->table_name.str)); { if (share->flags & NSF_BLOB_FLAG) { Loading @@ -2701,14 +2706,16 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, DBUG_ASSERT(ret == 0); } ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]); int ret= trans.write_row(::server_id, injector::transaction::table(table, true), int ret= trans.write_row(::server_id, injector::transaction::table(table, true), &b, n_fields, table->record[0]); DBUG_ASSERT(ret == 0); } break; case NDBEVENT::TE_DELETE: row.n_deletes++; DBUG_PRINT("info",("DELETE FROM %s.%s", table_s->db.str, table_s->table_name.str)); DBUG_PRINT("info",("DELETE FROM %s.%s", table_s->db.str, table_s->table_name.str)); { /* table->record[0] contains only the primary key in this case Loading Loading @@ -2737,14 +2744,16 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, } ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]); DBUG_EXECUTE("info", print_records(table, table->record[n]);); int ret= trans.delete_row(::server_id, injector::transaction::table(table, true), int ret= trans.delete_row(::server_id, injector::transaction::table(table, true), &b, n_fields, table->record[n]); DBUG_ASSERT(ret == 0); } break; case NDBEVENT::TE_UPDATE: row.n_updates++; DBUG_PRINT("info", ("UPDATE %s.%s", table_s->db.str, table_s->table_name.str)); DBUG_PRINT("info", ("UPDATE %s.%s", table_s->db.str, table_s->table_name.str)); { if (share->flags & NSF_BLOB_FLAG) { Loading Loading @@ -3025,15 +3034,16 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) MEM_ROOT *old_root= *root_ptr; MEM_ROOT mem_root; init_sql_alloc(&mem_root, 4096, 0); List<Cluster_replication_schema> post_epoch_log_list; List<Cluster_replication_schema> post_epoch_unlock_list; List<Cluster_schema> post_epoch_log_list; List<Cluster_schema> post_epoch_unlock_list; *root_ptr= &mem_root; if (unlikely(schema_res > 0)) { schema_ndb-> setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip); schema_ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); schema_ndb-> setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); NdbEventOperation *pOp= schema_ndb->nextEvent(); while (pOp != NULL) { Loading