Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
Zhang, Tong
Online Model App
Commits
5a832bc7
Commit
5a832bc7
authored
Aug 09, 2021
by
Tong Zhang
Browse files
ENH: Drive physics model update and viz with DAQT.
parent
f4edac4c
Changes
1
Hide whitespace changes
Inline
Side-by-side
src/myApp/app.py
View file @
5a832bc7
...
...
@@ -14,6 +14,7 @@ Show the available templates:
>>> makeBasePyQtApp -l
"""
import
sys
from
functools
import
partial
from
PyQt5.QtCore
import
pyqtSignal
from
PyQt5.QtCore
import
pyqtSlot
...
...
@@ -28,10 +29,11 @@ from mpl4qt.widgets import MatplotlibBaseWidget
from
phantasy
import
MachinePortal
from
phantasy_ui
import
BaseAppForm
from
phantasy_ui
import
milli_sleep
from
phantasy_ui
import
delayed_exec
from
phantasy_ui
import
get_save_filename
from
phantasy_ui.widgets
import
ProbeWidget
from
phantasy_ui.widgets
import
LatticeWidget
from
phantasy_ui.widgets
import
DataAcquisitionThread
as
DAQT
from
phantasy_apps.allison_scanner.data
import
draw_beam_ellipse_with_params
from
.utils
import
ResultsModel
...
...
@@ -57,6 +59,11 @@ class MyAppWindow(BaseAppForm, Ui_MainWindow):
# update layout drawings
update_layout
=
pyqtSignal
()
# data updated 1: (s, x0, y0, rx, ry)
data_updated1
=
pyqtSignal
(
tuple
)
# data updated 2: dict of Twiss X, dict of Twiss Y
data_updated2
=
pyqtSignal
(
dict
,
dict
)
def
__init__
(
self
,
version
,
**
kws
):
super
(
self
.
__class__
,
self
).
__init__
()
...
...
@@ -83,14 +90,17 @@ class MyAppWindow(BaseAppForm, Ui_MainWindow):
self
.
field_name_cbb
.
currentTextChanged
.
connect
(
self
.
on_fname_changed
)
self
.
elemlist_cbb
.
currentTextChanged
.
connect
(
self
.
on_target_element_changed
)
#
self
.
data_updated1
.
connect
(
self
.
on_update_data1
)
self
.
data_updated2
.
connect
(
self
.
on_update_data2
)
#
self
.
_size_factor
=
self
.
size_factor_sbox
.
value
()
self
.
ellipse_size_factor_changed
.
connect
(
self
.
draw_ellipse
)
# initial vars for FLAME model
self
.
results
=
None
self
.
last_bs
=
None
self
.
fm
=
None
self
.
updater
=
None
# simulator
# Dict of ProbeWidget for selected element and target element
self
.
_probe_widgets_dict
=
{}
...
...
@@ -141,7 +151,8 @@ class MyAppWindow(BaseAppForm, Ui_MainWindow):
self
.
lattice_load_window
.
load_btn
.
clicked
.
emit
()
loop
.
exec_
()
# auto xyscale (ellipse drawing)
self
.
auto_limits
()
delayed_exec
(
self
.
actionUpdate
.
triggered
.
emit
,
100
)
delayed_exec
(
self
.
auto_limits
,
1000
)
def
__init_envelope_plot
(
self
):
"""Initialize plot area for beam envelope.
...
...
@@ -193,7 +204,6 @@ class MyAppWindow(BaseAppForm, Ui_MainWindow):
# 2. set field name cbb with the first item
self
.
elem_selected
=
self
.
__mp
.
get_elements
(
name
=
name
)[
0
]
# milli_sleep(500)
self
.
field_name_cbb
.
currentTextChanged
.
disconnect
()
self
.
field_name_cbb
.
clear
()
self
.
field_name_cbb
.
addItems
(
self
.
elem_selected
.
fields
)
...
...
@@ -210,45 +220,30 @@ class MyAppWindow(BaseAppForm, Ui_MainWindow):
"""
self
.
fld_selected
.
value
=
val
# update simulation
self
.
__lat
.
sync_settings
()
_
,
fm
=
self
.
__lat
.
run
()
self
.
fm
=
fm
self
.
results
,
_
=
fm
.
run
(
monitor
=
'all'
)
r
,
_
=
fm
.
run
(
monitor
=
[
self
.
elemlist_cbb
.
currentText
()])
if
r
!=
[]:
self
.
last_bs
=
r
[
0
][
-
1
]
# update drawing
self
.
update_drawing
()
def
update_drawing
(
self
):
"""This is the routine to update the figure with the updated drawing.
Here I'm drawing the beam envelop along the entire beamline, try to
replace with your routine for beam ellipse drawing.
"""
# update online model (once)
self
.
actionUpdate
.
triggered
.
emit
()
@
pyqtSlot
(
tuple
)
def
on_update_data1
(
self
,
t1
):
s
,
x0
,
y0
,
rx
,
ry
=
t1
self
.
draw_envelope
(
s
,
rx
,
ry
)
self
.
draw_trajectory
(
s
,
x0
,
y0
)
@
pyqtSlot
(
dict
,
dict
)
def
on_update_data2
(
self
,
d1
,
d2
):
self
.
params_x
,
self
.
params_y
=
d1
,
d2
self
.
draw_ellipse
()
self
.
draw_envelope
()
self
.
draw_trajectory
()
def
draw_envelope
(
self
):
def
draw_envelope
(
self
,
pos
,
xrms
,
yrms
):
"""Draw beam envelop onto the figure area.
"""
results_dict
=
self
.
fm
.
collect_data
(
self
.
results
,
'pos'
,
'xrms'
,
'yrms'
)
pos
=
results_dict
[
'pos'
]
+
self
.
__z0
xrms
=
results_dict
[
'xrms'
]
yrms
=
results_dict
[
'yrms'
]
for
line_id
,
urms
in
zip
((
0
,
1
),
(
xrms
,
yrms
)):
self
.
envelope_plot
.
setLineID
(
line_id
)
self
.
envelope_plot
.
update_curve
(
pos
,
urms
)
def
draw_trajectory
(
self
):
def
draw_trajectory
(
self
,
pos
,
xcen
,
ycen
):
"""Draw beam centroid trajectory onto the figure area.
"""
results_dict
=
self
.
fm
.
collect_data
(
self
.
results
,
'pos'
,
'xcen'
,
'ycen'
)
pos
=
results_dict
[
'pos'
]
+
self
.
__z0
xcen
=
results_dict
[
'xcen'
]
ycen
=
results_dict
[
'ycen'
]
for
line_id
,
ucen
in
zip
((
0
,
1
),
(
xcen
,
ycen
)):
self
.
trajectory_plot
.
setLineID
(
line_id
)
self
.
trajectory_plot
.
update_curve
(
pos
,
ucen
)
...
...
@@ -257,18 +252,7 @@ class MyAppWindow(BaseAppForm, Ui_MainWindow):
def
draw_ellipse
(
self
):
"""Draw x and y beam ellipse onto the figure area.
"""
#
s
=
self
.
last_bs
#
vals_x
=
(
s
.
xcen
,
s
.
xpcen
,
s
.
xrms
,
s
.
xprms
,
s
.
xemittance
,
s
.
xnemittance
,
s
.
xtwiss_alpha
,
s
.
xtwiss_beta
,
(
s
.
xtwiss_alpha
**
2
+
1
)
/
s
.
xtwiss_beta
,
1
)
vals_y
=
(
s
.
ycen
,
s
.
ypcen
,
s
.
yrms
,
s
.
yprms
,
s
.
yemittance
,
s
.
ynemittance
,
s
.
ytwiss_alpha
,
s
.
ytwiss_beta
,
(
s
.
ytwiss_alpha
**
2
+
1
)
/
s
.
ytwiss_beta
,
1
)
params_x
=
dict
(
zip
(
TWISS_KEYS_X
,
vals_x
))
params_y
=
dict
(
zip
(
TWISS_KEYS_Y
,
vals_y
))
params_x
,
params_y
=
self
.
params_x
,
self
.
params_y
self
.
_plot_ellipse
(
self
.
x_ellipse_plot
,
params_x
,
color
=
'b'
,
...
...
@@ -329,14 +313,7 @@ class MyAppWindow(BaseAppForm, Ui_MainWindow):
elem
=
self
.
__lat
[
ename
]
self
.
family_lineEdit
.
setText
(
elem
.
family
)
self
.
pos_lineEdit
.
setText
(
f
"
{
elem
.
sb
+
self
.
__z0
:.
3
f
}
m"
)
r
,
_
=
self
.
fm
.
run
(
monitor
=
[
ename
])
if
r
==
[]:
QMessageBox
.
warning
(
self
,
"Select Element"
,
"Selected element cannot be located in model, probably for splitable element, select the closest one."
,
QMessageBox
.
Ok
,
QMessageBox
.
Ok
)
return
self
.
last_bs
=
r
[
0
][
-
1
]
self
.
draw_ellipse
()
self
.
actionUpdate
.
triggered
.
emit
()
def
_show_results
(
self
,
data
):
m
=
ResultsModel
(
self
.
twiss_results_treeView
,
data
)
...
...
@@ -417,14 +394,12 @@ class MyAppWindow(BaseAppForm, Ui_MainWindow):
def
on_auto_xlim
(
self
):
"""Auto set xlimit.
"""
x0
,
x1
=
0
,
1
x0
,
x1
=
0
,
0
for
o
in
self
.
ellipse_area
.
findChildren
(
MatplotlibBaseWidget
):
o
.
set_autoscale
(
'x'
)
x0_
,
x1_
=
o
.
axes
.
get_xlim
()
if
x0_
<
x0
:
x0
=
x0_
if
x1_
>
x1
:
x1
=
x1_
x0
=
min
(
x0_
,
x0
)
x1
=
max
(
x1_
,
x1
)
self
.
xlim_x1_lineEdit
.
setText
(
f
'
{
x0
:.
1
f
}
'
)
self
.
xlim_x2_lineEdit
.
setText
(
f
'
{
x1
:.
1
f
}
'
)
...
...
@@ -432,14 +407,12 @@ class MyAppWindow(BaseAppForm, Ui_MainWindow):
def
on_auto_ylim
(
self
):
"""Auto set ylimit.
"""
y0
,
y1
=
0
,
1
y0
,
y1
=
0
,
0
for
o
in
self
.
ellipse_area
.
findChildren
(
MatplotlibBaseWidget
):
o
.
set_autoscale
(
'y'
)
y0_
,
y1_
=
o
.
axes
.
get_ylim
()
if
y0_
<
y0
:
y0
=
y0_
if
y1_
>
y1
:
y1
=
y1_
y0
=
min
(
y0_
,
y0
)
y1
=
max
(
y1_
,
y1
)
self
.
ylim_y1_lineEdit
.
setText
(
f
'
{
y0
:.
1
f
}
'
)
self
.
ylim_y2_lineEdit
.
setText
(
f
'
{
y1
:.
1
f
}
'
)
...
...
@@ -533,6 +506,82 @@ class MyAppWindow(BaseAppForm, Ui_MainWindow):
self
.
elem_name_cbb
.
currentTextChanged
.
connect
(
self
.
on_elem_name_changed
)
self
.
elem_name_cbb
.
currentTextChanged
.
emit
(
self
.
elem_name_cbb
.
currentText
())
@
pyqtSlot
(
bool
)
def
onAutoUpdateModel
(
self
,
toggled
):
"""Auto update simulation.
"""
print
(
"Update model at 1 Hz."
)
self
.
_sim_is_running
()
def
_sim_is_running
(
self
):
try
:
r
=
self
.
updater
.
isRunning
()
except
:
r
=
False
finally
:
return
r
@
pyqtSlot
()
def
onUpdateModel
(
self
):
"""Update simulation.
"""
if
self
.
_sim_is_running
():
return
self
.
updater
=
DAQT
(
daq_func
=
partial
(
self
.
update_single
,
self
.
__lat
,
self
.
elemlist_cbb
.
currentText
()),
daq_seq
=
range
(
1
))
self
.
updater
.
daqStarted
.
connect
(
partial
(
self
.
set_widgets_status
,
"START"
))
self
.
updater
.
resultsReady
.
connect
(
self
.
on_updater_results_ready
)
self
.
updater
.
finished
.
connect
(
partial
(
self
.
set_widgets_status
,
"STOP"
))
self
.
updater
.
start
()
def
update_single
(
self
,
lat
,
target_ename
,
iiter
):
lat
.
sync_settings
()
_
,
fm
=
lat
.
run
()
results
,
_
=
fm
.
run
(
monitor
=
'all'
)
r
,
_
=
fm
.
run
(
monitor
=
[
target_ename
])
return
results
,
r
,
fm
def
on_updater_results_ready
(
self
,
res
):
results
,
r
,
fm
=
res
[
0
]
# pos, xrms, yrms, xcen, ycen, twiss parameters
self
.
fm
=
fm
# s, x0, y0, rx, ry
r1_
=
fm
.
collect_data
(
results
,
'pos'
,
'xcen'
,
'ycen'
,
'xrms'
,
'yrms'
)
pos
=
r1_
[
'pos'
]
+
self
.
__z0
xcen
,
ycen
=
r1_
[
'xcen'
],
r1_
[
'ycen'
]
xrms
,
yrms
=
r1_
[
'xrms'
],
r1_
[
'yrms'
]
self
.
data_updated1
.
emit
((
pos
,
xcen
,
ycen
,
xrms
,
yrms
))
#
if
r
==
[]:
QMessageBox
.
warning
(
self
,
"Select Element"
,
"Selected element cannot be located in model, probably for splitable element, select the closest one."
,
QMessageBox
.
Ok
,
QMessageBox
.
Ok
)
else
:
self
.
__update_twiss_params
(
r
)
def
__update_twiss_params
(
self
,
r
):
s
=
r
[
0
][
-
1
]
vals_x
=
(
s
.
xcen
,
s
.
xpcen
,
s
.
xrms
,
s
.
xprms
,
s
.
xemittance
,
s
.
xnemittance
,
s
.
xtwiss_alpha
,
s
.
xtwiss_beta
,
(
s
.
xtwiss_alpha
**
2
+
1
)
/
s
.
xtwiss_beta
,
1
)
vals_y
=
(
s
.
ycen
,
s
.
ypcen
,
s
.
yrms
,
s
.
yprms
,
s
.
yemittance
,
s
.
ynemittance
,
s
.
ytwiss_alpha
,
s
.
ytwiss_beta
,
(
s
.
ytwiss_alpha
**
2
+
1
)
/
s
.
ytwiss_beta
,
1
)
params_x
=
dict
(
zip
(
TWISS_KEYS_X
,
vals_x
))
params_y
=
dict
(
zip
(
TWISS_KEYS_Y
,
vals_y
))
self
.
data_updated2
.
emit
(
params_x
,
params_y
)
def
set_widgets_status
(
self
,
status
):
olist1
=
(
self
.
actionUpdate
,
self
.
actionAuto_Update
,
)
olist2
=
()
if
status
!=
"START"
:
[
o
.
setEnabled
(
True
)
for
o
in
olist1
]
[
o
.
setEnabled
(
False
)
for
o
in
olist2
]
else
:
[
o
.
setEnabled
(
False
)
for
o
in
olist1
]
[
o
.
setEnabled
(
True
)
for
o
in
olist2
]
if
__name__
==
"__main__"
:
from
PyQt5.QtWidgets
import
QApplication
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment