""" Opinion amplification causes extreme polarization in social networks - polar.py by Soo Ling Lim and Peter J. Bentley This code is only to be used for research purposes. Email: s.lim@cs.ucl.ac.uk example command: python polar.py -k 2 -s -1 -n 100 -t 200 -a 0 -e 0.8 -m 100000 -p 0 -x 0 -as 0.0 """ """ implemented on Python 3.9.1 """ import argparse import math import time import random import sys import json import pickle import numpy as np import os import matplotlib.pyplot as plt from matplotlib.patches import Rectangle import networkx as nx RESULTS_DIRECTORY = 'results/' IMAGE_DIRECTORY = 'image/' image_format = 'png' # constants RANDOM_SEED: int NUM_NODES: int NUM_TIMESTEPS: int ERDOS_P: float AMPLIFIER: float # 0.5 means of 50% people exaggerate CONFIDENCE_THRESHOLD: float # how "open" people are, opinions outside the threshold is ignored PROBABILITY_AMPLIFY: float SAVE_IMAGE: bool MAX_EXAGGERATE: int NUM_CORRECTIONS: int ERDOS_K: int AMPLIFICATION_STRENGTH: float NETWORK_TYPE = 'e' # 'e' (erdos) 's' (scale free) 'b' (barabasi) 'g' (gn graph) BROADCAST_FREQUENCY = 2 # every x days ERDOS_DIRECTED_GRAPH = True PRUNE = False broadcast_day = False # globals MIN_ORIGINAL_OPINION = -1.0 MAX_ORIGINAL_OPINION = 1.0 FIXED_MESSAGE = [] MIN_AMPLIFIED = -2.0 MAX_AMPLIFIED = 2.0 agents = [] x = [] y = [] fixed_message_x = [] fixed_message_y = [] plot_color = [] min_diff = 1000 max_diff = 0 g = None def random_opinion(): return random.uniform(MIN_ORIGINAL_OPINION, MAX_ORIGINAL_OPINION) def random_positive_opinion(): return random.uniform(0, MAX_ORIGINAL_OPINION) def random_negative_opinion(): return random.uniform(MIN_ORIGINAL_OPINION, 0) def remove_duplicates_and_selfloop(): global g print(g.number_of_edges()) g = nx.Graph(g) # remove parallel edges print(g.number_of_edges()) g.remove_edges_from(nx.selfloop_edges(g)) print(g.number_of_edges()) def erdos(): global g g = nx.erdos_renyi_graph(NUM_NODES, p=ERDOS_P, seed=RANDOM_SEED, directed=ERDOS_DIRECTED_GRAPH) if PRUNE: remove_duplicates_and_selfloop() def scale_free(): global g g = nx.scale_free_graph(n=NUM_NODES, seed=RANDOM_SEED) if PRUNE: remove_duplicates_and_selfloop() def random_k_out(): # this contains a lot of duplicated edges global g MAX_FRIENDS = ERDOS_K proportion = int(0.2*NUM_NODES) k = min(MAX_FRIENDS, proportion) g = nx.random_k_out_graph(n=NUM_NODES, k=k, alpha=0.3) if PRUNE: remove_duplicates_and_selfloop() def barabasi(): global g MAX_FRIENDS = ERDOS_K proportion = int(0.2*NUM_NODES) num_edges_to_attach_from_new_node = min(MAX_FRIENDS, proportion) g = nx.barabasi_albert_graph(n=NUM_NODES, m=num_edges_to_attach_from_new_node, seed=RANDOM_SEED) def gn_graph(): global g g = nx.gn_graph(NUM_NODES) def make_network(): if NETWORK_TYPE == 'e': erdos() elif NETWORK_TYPE == 's': scale_free() elif NETWORK_TYPE == 'b': barabasi() elif NETWORK_TYPE == 'g': gn_graph() elif NETWORK_TYPE == 'r': random_k_out() def add_opinions(): for i in g.nodes(): g.nodes[i][-1] = random_opinion() g.nodes[i]['t_0'] = [] # add exageration potential g.nodes[i]['exaggerate'] = is_probability_true(AMPLIFIER) g.nodes[i]['exaggerate_count'] = 0 g.nodes[i]['is_exaggerating_now'] = False def add_new_opinion(t, recipient, new_opinion): try: g.nodes[recipient]['opinion_history'][t] except: g.nodes[recipient]['opinion_history'][t] = [] g.nodes[recipient]['opinion_history'][t].append(new_opinion) def cap(val): if val > MAX_AMPLIFIED: return MAX_AMPLIFIED elif val < MIN_AMPLIFIED: return MIN_AMPLIFIED else: return val def std(data): """ calculate the sample standard deviation. specify ddof=0 to compute the population standard deviation. """ ddof = 1.0 n = len(data) mean = sum(data)/float(n) if n < 2.0: # make it still work for n < 2 std = 0.0 else: ss = sum((x-mean)**2 for x in data) pvar = ss/(n-ddof) std = pvar**0.5 return std def mean(data): n = len(data) return sum(data)/float(n) def analyse(): opinion_std = [] timestep = [] for t in range(NUM_TIMESTEPS): data = [] for i in g.nodes(): data.append(g.nodes[i][t]) opinion_std.append(round(std(data),5)) timestep.append(t) #---------- # Save results #---------- if not os.path.exists(RESULTS_DIRECTORY): os.makedirs(RESULTS_DIRECTORY) results_file = RESULTS_DIRECTORY + 'conflict_a' + str(AMPLIFIER) + '_n' + str(NUM_NODES) + '_k' + str(ERDOS_K) + '_e' + str(CONFIDENCE_THRESHOLD) + '_p' + str(PROBABILITY_AMPLIFY) + '_as' + str(AMPLIFICATION_STRENGTH) + '.pkl' pickle.dump([RANDOM_SEED, timestep, opinion_std], open(results_file, 'wb')) print("Seed and results saved to", results_file) def plot(): #---------- # Save results #---------- if not os.path.exists(RESULTS_DIRECTORY): os.makedirs(RESULTS_DIRECTORY) results_file = RESULTS_DIRECTORY + 'opinion_a' + str(AMPLIFIER) + '_n' + str(NUM_NODES) + '_k' + str(ERDOS_K) + '_e' + str(CONFIDENCE_THRESHOLD) + '_p' + str(PROBABILITY_AMPLIFY) + '_as' + str(AMPLIFICATION_STRENGTH) + '.pkl' pickle.dump([RANDOM_SEED, x, y, plot_color], open(results_file, 'wb')) print("Seed and results saved to", results_file) area = 5 fig, ax = plt.subplots() # add top rectangle background left = 0 bottom = 1.0 width = NUM_TIMESTEPS height = 1.0 ax.add_patch(Rectangle((left, bottom), width, height, color = 'gray', alpha=0.2)) # end bottom rectangle background bottom = -2.0 ax.add_patch(Rectangle((left, bottom), width, height, color = 'gray', alpha=0.2)) ax.scatter(x, y, s=area, c=plot_color, alpha=0.5) ax.scatter(fixed_message_x, fixed_message_y, s=area, c='green', alpha=0.2) ax.set_ylim([MIN_AMPLIFIED, MAX_AMPLIFIED]) ax.set_ylabel('Opinion') ax.set_xlabel('Timestep') ax.margins(x=0) fig.tight_layout() if SAVE_IMAGE: if not os.path.exists(IMAGE_DIRECTORY): os.makedirs(IMAGE_DIRECTORY) plt.savefig(IMAGE_DIRECTORY + '/plot' + str(RANDOM_SEED) + '.' + image_format) plt.clf() else: plt.show() def receive_message(message, receiver, t): if (abs(message - g.nodes[receiver][t-1]) <= CONFIDENCE_THRESHOLD): g.nodes[receiver]['t_'+ str(t+1)].append(message) def is_probability_true(value): return (random.uniform(0.0, 1.0) <= value) # the higher value, the more likely to return true def exceeed_threshold(opinion_list): exceeded = False for item in opinion_list: if item < MIN_ORIGINAL_OPINION or item > MAX_ORIGINAL_OPINION: exceeded = True break return exceeded def run_world(): for t in range(NUM_TIMESTEPS): FIXED_MESSAGE = [] if (t % BROADCAST_FREQUENCY == 0): broadcast_day = True for i in range(NUM_CORRECTIONS): random_fixed_message = random_opinion() FIXED_MESSAGE.append(random_fixed_message) fixed_message_y.append(random_fixed_message) fixed_message_x.append(t) else: broadcast_day = False for i in g.nodes(): g.nodes[i]['t_'+ str(t+1)] = [] for spreader in range(NUM_NODES): if g.nodes[spreader]['exaggerate'] and is_probability_true(PROBABILITY_AMPLIFY): # only exaggerate when all true g.nodes[spreader]['is_exaggerating_now'] = True g.nodes[spreader]['exaggerate_count'] += 1 if g.nodes[spreader][t-1] < 0: message = cap(g.nodes[spreader][t-1] - random.uniform(0, AMPLIFICATION_STRENGTH)) else: message = cap(g.nodes[spreader][t-1] + random.uniform(0, AMPLIFICATION_STRENGTH)) else: g.nodes[spreader]['is_exaggerating_now'] = False message = g.nodes[spreader][t-1] # if a person has exagerated too much, swap them with a new person with new random opinion and new probability of exaggerating if g.nodes[spreader]['exaggerate_count'] > MAX_EXAGGERATE: g.nodes[spreader][t-1] = random_opinion() g.nodes[spreader]['exaggerate'] = is_probability_true(AMPLIFIER) g.nodes[spreader]['exaggerate_count'] = 0 # find who they are connected to for receiver in g.neighbors(spreader): receive_message(message, receiver, t) # update opinion (and retweet) for i in g.nodes(): if len(g.nodes[i]['t_'+ str(t)]) > 0: # received more than one opinion from others # give everyone fixed message if broadcast_day: for item in FIXED_MESSAGE: receive_message(item, i, t) # try to give them opinions in the original range # update opinion g.nodes[i]['t_'+ str(t)].append(g.nodes[i][t-1]) # include own opinion in the "average" g.nodes[i][t] = mean(g.nodes[i]['t_'+ str(t)]) else: g.nodes[i][t] = g.nodes[i][t-1] # keep old opinion y.append(g.nodes[i][t]) x.append(t) if g.nodes[i]['is_exaggerating_now']: plot_color.append('red') else: plot_color.append('#1f77b4') def parse_arguments(): global RANDOM_SEED, NUM_NODES, ERDOS_K, ERDOS_P, AMPLIFIER, CONFIDENCE_THRESHOLD, NUM_TIMESTEPS, PROBABILITY_AMPLIFY, SAVE_IMAGE, MAX_EXAGGERATE, NUM_CORRECTIONS, AMPLIFICATION_STRENGTH parser = argparse.ArgumentParser() parser.add_argument('-s', '--seed', help='enter seed number or enter -1 to use current time as seed number') parser.add_argument('-n', '--num_nodes', help='enter number of nodes') parser.add_argument('-t', '--num_timesteps', help='enter number of timesteps') parser.add_argument('-k', '--average_links_per_node', help='enter average number of links per node') parser.add_argument('-a', '--amplifier', help='enter proportion of amplifiers (between 0.0-1.0)') parser.add_argument('-e', '--confidence_threshold', help='enter confidence threshold (between 0.0-1.0)') parser.add_argument('-p', '--prob_amplify', help='enter probability of exaggerating (between 0.0-1.0)') parser.add_argument('-m', '--max_exaggerate', help='enter max number of times someone can exaggerate (integer)') parser.add_argument('-x', '--num_corrections', help='enter max number of corrections (integer)') parser.add_argument('-i', '--image', action='store_true', help='if argument is used, save image as png.') parser.add_argument('-as', '--amplification_strength', help='enter strength of amplification (between 0.0-1.0)') args = parser.parse_args() # constants seed = int(args.seed) NUM_NODES = int(args.num_nodes) NUM_TIMESTEPS = int(args.num_timesteps) AMPLIFIER = float(args.amplifier) CONFIDENCE_THRESHOLD = float(args.confidence_threshold) ERDOS_K = int(args.average_links_per_node) ERDOS_P = ERDOS_K/NUM_NODES PROBABILITY_AMPLIFY = float(args.prob_amplify) MAX_EXAGGERATE = int(args.max_exaggerate) NUM_CORRECTIONS = int(args.num_corrections) SAVE_IMAGE = args.image AMPLIFICATION_STRENGTH = float(args.amplification_strength) return seed def main(): global RANDOM_SEED, agents, x, y, plot_color seed = parse_arguments() if seed < 0: RANDOM_SEED = int(time.time()) # use current time as random seed else: RANDOM_SEED = seed random.seed(RANDOM_SEED) agents = [] x = [] y = [] plot_color = [] make_network() add_opinions() run_world() analyse() plot() if __name__ == "__main__": main()