Thank you very much for your answer! As you said, my current version of the code does adjust the weights after the simulation runs. The complete code for network construction and training is as follows:
# Function to calculate distance-based connection probability
def distance_probability(pos1, pos2, sigma=0.1):
distance = np.linalg.norm(pos1 - pos2)
return np.exp(-(distance**2) / (2 * sigma**2))
# Function to calculate Euclidean distance between two spike trains
def euclidean_distance(spike_train1, spike_train2):
# Ensure both spike trains have the same length by padding with zeros
length = max(len(spike_train1), len(spike_train2))
spike_train1 = np.pad(spike_train1, (0, length - len(spike_train1)), "constant")
spike_train2 = np.pad(spike_train2, (0, length - len(spike_train2)), "constant")
return np.sqrt(np.sum((spike_train1 - spike_train2) ** 2))
# Function to save weights
def save_weights():
return {
"S_input_liquid": S_input_liquid.w[:].copy(),
"S_liquid_internal": S_liquid_internal.w[:].copy(),
"S_liquid_output": S_liquid_output.w[:].copy(),
}
# Function to load weights
def load_weights(weights):
S_input_liquid.w[:] = weights["S_input_liquid"]
S_liquid_internal.w[:] = weights["S_liquid_internal"]
S_liquid_output.w[:] = weights["S_liquid_output"]
# Plot weight changes over iterations
def plot_weight_changes(weights, title):
weights = np.array(weights)
plt.figure(figsize=(12, 6))
plt.plot(weights)
plt.title(title)
plt.xlabel('Iteration')
plt.ylabel('Weight')
plt.show()
# Function to compute the exponential decay kernel Wd for the ReSuMe learning rule
def resume_kernel_d(sd):
Ad = 0.1 # Learning rate constant
taud = 20 # Learning time constant
if sd > 0:
return Ad * np.exp(-sd / taud)
else:
return 0
# Function to compute the exponential decay kernel Wl for the ReSuMe learning rule
def resume_kernel_l(sl):
Al = 0.1 # Learning rate constant
taul = 20 # Learning time constant
if sl > 0:
return -Al * np.exp(-sl / taul)
else:
return 0
# Function to update synaptic weights from liquid layer to output layer using the ReSuMe learning rule
def resume_output_weights():
ad = 0.0001 # non-Hebbian weight term(desired spike trains)
al = 0.0001 # non-Hebbian weight term(actual spike trains)
# Initialize weight changes
dw = np.zeros((N_output, N_liquid))
# Update weights using ReSuMe rule
for j in range(N_output): # Loop over output neurons
Sl = np.array(spikemon_output.spike_trains()[j] / ms)
print(f"Output Neuron{j} Sl:",Sl)
Sd = np.unique(desired_spike_trains[j] / ms)
print(f"Desired Neuron{j} Sd:",Sd)
for i in range(N_liquid): # Loop over liquid neurons
Sin = np.array(spikemon_liquid.spike_trains()[i] / ms)
dw_tmp = 0
# desired output contribution to the weight change
for td in Sd[-5:]:
dw_tmp += ad
for sd in Sin - td:
dw_tmp += resume_kernel_d(sd)
# actual output contribution to the weight change
for tl in Sl[-5:]:
dw_tmp -= al
for sl in Sin - tl:
dw_tmp += resume_kernel_l(sl)
# Update the weight change
dw[j, i] = dw_tmp
return dw / float(N_liquid)
# Define basic parameters
np.random.seed(42)
N_input = X_train.shape[1]
N_liquid = 2000
N_output = len(np.unique(y_train))
single_example_time = 1 * second
num_iterations = 1
run_time = num_iterations * single_example_time
# Neuron model parameters
Ee = 0.0 * mV
El = -74.0 * mV
vr = -60.0 * mV
vt = -54.0 * mV
taum = 100.0 * ms
taue = 65.0 * ms
# STDP parameters
taupre = 20.0 * ms
taupost = 20.0 * ms
gmax = 0.01
dApre = 0.01
dApost = -dApre * taupre / taupost * 1.05
dApost *= gmax
dApre *= gmax
# Define neuron equations
eqs_liquid = """
dv/dt = (ge * (Ee - v) + El - v)/taum : volt
dge/dt = -ge/taue : 1
"""
eqs_output = """
dv/dt = (ge * (Ee - v) + El - v)/(50 * ms) : volt
dge/dt = -ge/(1 * ms) : 1
"""
# Create input groups
input_neurons = PoissonGroup(N_input, 0 * Hz)
# Create teacher groups
teacher_neurons = SpikeGeneratorGroup(N_output, [], [] * ms)
# Define liquid neuron groups
liquid_neurons = NeuronGroup(N_liquid, eqs_liquid, threshold="v > vt", reset="v = vr", method="euler")
# Define output neuron groups
output_neurons = NeuronGroup(N_output, eqs_output, threshold="v > vt", reset="v = vr", method="euler")
# Define synapses between input layer and liquid layer
S_input_liquid = Synapses(input_neurons, liquid_neurons, "w : 1", on_pre="ge += w")
S_input_liquid.connect()
S_input_liquid.w = "rand() * gmax"
# Define synapses within the liquid layer based on distance
S_liquid_internal = Synapses(liquid_neurons, liquid_neurons, "w : 1", on_pre="ge_post += w")
# Assign positions to liquid neurons in 3D
positions = np.random.rand(N_liquid, 3) # Uniformly distributed positions in a 3D space
# Connect based on distance probability in 3D
for i in range(N_liquid):
for j in range(N_liquid):
if i != j:
prob = distance_probability(positions[i], positions[j])
if np.random.rand() < prob:
S_liquid_internal.connect(i=i, j=j)
S_liquid_internal.w = "rand() * gmax"
# Define synapses between liquid layer and output layer
S_liquid_output = Synapses(liquid_neurons, output_neurons, "w : 1", on_pre="ge_post += w")
S_liquid_output.connect()
S_liquid_output.w = "rand() * gmax"
# Create monitors
spikemon_input = SpikeMonitor(input_neurons)
spikemon_teacher = SpikeMonitor(teacher_neurons)
spikemon_liquid = SpikeMonitor(liquid_neurons)
spikemon_output = SpikeMonitor(output_neurons)
statemon_liquid = StateMonitor(liquid_neurons, ['v', 'ge'], record=True, dt=1 * ms)
statemon_output = StateMonitor(output_neurons, ['v', 'ge'], record=True, dt=1 * ms)
# statemon_input_liquid = StateMonitor(S_liquid_internal, "w", record=True)
# statemon_liquid_output = StateMonitor(S_liquid_output, "w", record=True)
# Define desired spike trains
desired_spikes = [750, 800, 850, 900, 950] * ms
# Store the initial state of the network
store()
# Lists to store weights over iterations
input_liquid_weights = []
liquid_internal_weights = []
liquid_output_weights = []
# Record the initial weights
input_liquid_weights.append(S_input_liquid.w[:].copy())
liquid_internal_weights.append(S_liquid_internal.w[:].copy())
liquid_output_weights.append(S_liquid_output.w[:].copy())
# Lists to store Euclidean distances for each output neuron
distances = {i: [] for i in range(N_output)}
iterations = []
for iteration in range(num_iterations):
print(f"Iteration {iteration + 1}/{num_iterations}")
# Resotre the original state of the network
restore()
# Assign values to the spike rates of input neurons
input_spike_rates = X_train.iloc[0, :].values.astype(float) * 108.0
input_neurons.rates = input_spike_rates * Hz
# Assign desired spike trains of teacher neurons
teacher = y_train.iloc[0].astype(int)
teacher_neurons.set_spikes([teacher] * len(desired_spikes), desired_spikes)
desired_spike_trains = np.zeros((N_output, len(desired_spikes))) * ms
desired_spike_trains[teacher,:] = desired_spikes
# Load weights from previous iteration
if iteration > 0:
load_weights(saved_weights)
# Run the simulation
run(single_example_time, report="text")
# Update weights using ReSuMe
dw = resume_output_weights()
dw_flattened = dw.flatten()
# print("dw_flattened shape:", dw_flattened.shape)
# # Ensure shapes match
# assert S_liquid_output.w[:].shape == dw_flattened.shape, "Shapes do not match after flattening dw"
S_liquid_output.w[:] += dw_flattened
S_liquid_output.w[:] = np.clip(S_liquid_output.w[:], 0, gmax) # Limit the weights to [0, gmax]
# Record the weights
input_liquid_weights.append(S_input_liquid.w[:].copy())
liquid_internal_weights.append(S_liquid_internal.w[:].copy())
liquid_output_weights.append(S_liquid_output.w[:].copy())
# Save current weights
saved_weights = save_weights()
# Calculate Euclidean distance
for k in range(N_output):
output_spike_trains = np.array(spikemon_output.spike_trains()[k] / ms) # Remove units
desired_single_spike_trains = desired_spikes / ms # Remove units
distance = euclidean_distance(output_spike_trains[:5], desired_single_spike_trains[:5])
distances[k].append(distance)
# Debugging info
print(f"Output neuron{k}:")
print("output spike trains:", output_spike_trains)
print("desired spike trains", desired_single_spike_trains)
print(distance)
iterations.append(iteration)
But my previous version of the code did modify the weights while running the simulation. I also attached this version of the code below.
# Define basic parameters
np.random.seed(42)
N_input = X_train.shape[1]
N_liquid = 2000
N_output = len(np.unique(y_train))
single_example_time = 1 * second
# Neuron model parameters
Ee = 0.0 * mV
El = -74.0 * mV
v_reset = -60.0 * mV
v_thresh = -54.0 * mV
taum = 100.0 * ms
taue = 50.0 * ms
# STDP parameters
taupre = 20.0 * ms
taupost = 20.0 * ms
gmax = 0.01
dApre = 0.01
dApost = -dApre * taupre / taupost * 1.05
dApost *= gmax
dApre *= gmax
# ReSuMe parameters
taul = 20.0 * ms
taud = 20.0 * ms
dAd = 0.01
dAl = 0.01
dAl *= gmax
dAd *= gmax
# Define liquid neuron equations
eqs_neurons_liquid = """
dv/dt = (ge * (Ee - v) + El - v) / taum_l : volt
dge/dt = -ge / taue_l : 1
"""
# Define output neuron equations
eqs_neurons_output = """
dv/dt = (ge * (Ee - v) + El - v) / taum_o : volt
dge/dt = -ge / taue_o : 1
"""
# Create input groups
input_groups = PoissonGroup(N_input, 0 * Hz)
# Create teacher groups
teacher_groups = SpikeGeneratorGroup(N_output, [], [] * ms)
# Define liquid neuron groups
neurons_liquid = NeuronGroup(
N_liquid,
eqs_neurons_liquid,
threshold="v > v_thresh",
reset="v = v_reset",
method="euler",
)
# Define output neuron groups
neurons_output = NeuronGroup(
N_output,
eqs_neurons_output,
threshold="v > v_thresh",
reset="v = v_reset",
method="euler",
)
# Define synapses between input layer and liquid layer
S_input_to_liquid = Synapses(
input_groups,
neurons_liquid,
"""
w : 1
dApre/dt = -Apre / taupre : 1 (event-driven)
dApost/dt = -Apost / taupost : 1 (event-driven)
""",
on_pre="""ge += w
Apre += dApre
w = clip(w + Apost, 0, gmax)""",
on_post="""Apost += dApost
w = clip(w + Apre, 0, gmax)""",
)
S_input_to_liquid.connect()
S_input_to_liquid.w = "rand() * gmax"
# Define synapses within the liquid layer based on distance
S_liquid_internal = Synapses(
neurons_liquid, neurons_liquid, "w : 1", on_pre="ge_post += w"
)
# Assign positions to liquid neurons in 3D
positions = np.random.rand(N_liquid, 3) # Uniformly distributed positions in a 3D space
# Connect based on distance probability in 3D
for i in range(N_liquid):
for j in range(N_liquid):
if i != j:
prob = distance_probability(positions[i], positions[j])
if np.random.rand() < prob:
S_liquid_internal.connect(i=i, j=j)
S_liquid_internal.w = "rand() * gmax"
# Define synapses between liquid layer and output layer
S_liquid_to_output = Synapses(
neurons_liquid,
neurons_output,
"""
w : 1
dAl/dt = -Al / taul : 1 (event-driven)
dAd/dt = -Ad / taud : 1 (event-driven)
""",
on_pre="""ge_post += w""",
on_post="""Al += dAl; w = clip(w - Al, 0, gmax)""",
)
S_liquid_to_output.connect()
S_liquid_to_output.w = "rand() * gmax"
# Define synapses between liquid layer and teacher layer
S_liquid_to_teachers = Synapses(
neurons_liquid,
teacher_groups,
" ",
on_post="""Ad += dAd; w = clip(w + Ad, 0, gmax)""",
)
S_liquid_to_teachers.connect()
S_liquid_to_teachers.variables.add_reference("w", S_liquid_to_output, "w")
S_liquid_to_teachers.variables.add_reference("Al", S_liquid_to_output, "Al")
S_liquid_to_teachers.variables.add_reference("Ad", S_liquid_to_output, "Ad")
# Create monitors
spikemon_teacher = SpikeMonitor(teacher_groups)
spikemon_output = SpikeMonitor(neurons_output)
state_monitor_output = StateMonitor(neurons_output, True, record=True, dt=1 * second)
state_monitor_liquid_to_output = StateMonitor(
S_liquid_to_output, "w", record=True, dt=1 * second
)
# Lists to store Euclidean distances for each output neuron
distances = {i: [] for i in range(N_output)}
iterations = []
# Define desired spike trains
desired_spikes = [0, 100, 200, 300, 400, 500, 600, 700, 800, 900] * ms
# Run multiple iterations of training
num_iterations = 1000
input_groups.rates = 0 * Hz
run(0 * second)
for iteration in range(num_iterations):
input_spike_rates = X_train.iloc[0, :].values.astype(float) * 108.0
input_groups.rates = input_spike_rates * Hz
teacher = y_train.iloc[0].astype(int)
teacher_spikes = desired_spikes + 1200 * iteration * ms
teacher_groups.set_spikes([teacher] * len(teacher_spikes), teacher_spikes)
run(single_example_time, report="text")
# Calculate Euclidean distance
for k in range(N_output):
output_spike_trains = np.array(
spikemon_output.spike_trains()[k] / ms
) # Convert to ms and remove units
desired_spike_trains = desired_spikes / ms # Convert to ms and remove units
# Normalize spike times so that each sequence starts from 0
if len(output_spike_trains) > 0:
output_spike_trains -= output_spike_trains[0]
output_spike_trains -= iteration * 1200
actual_output_spike_trains = output_spike_trains[output_spike_trains > 0]
print("output spike trains:", actual_output_spike_trains)
print("desired spike trains", desired_spike_trains)
distance = euclidean_distance(actual_output_spike_trains, desired_spike_trains)
distances[k].append(distance)
iterations.append(iteration)
But no matter which version, I can’t throw a satisfactory result, even far from my expectation, so I want to know whether it is a problem with my parameter settings or a problem with my code logic.
Thank you very much again for taking the time out of your busy schedule to answer my questions.